diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/UnmatchedColumnBehavior.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/UnmatchedColumnBehavior.java new file mode 100644 index 0000000000..094f0daf60 --- /dev/null +++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/UnmatchedColumnBehavior.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.nifi.processors.iceberg; + +import org.apache.nifi.components.DescribedValue; + +public enum UnmatchedColumnBehavior implements DescribedValue { + IGNORE_UNMATCHED_COLUMN("Ignore Unmatched Columns", + "Any column in the database that does not have a field in the document will be assumed to not be required. No notification will be logged"), + + WARNING_UNMATCHED_COLUMN("Warn on Unmatched Columns", + "Any column in the database that does not have a field in the document will be assumed to not be required. A warning will be logged"), + + FAIL_UNMATCHED_COLUMN("Fail on Unmatched Columns", + "A flow will fail if any column in the database that does not have a field in the document. An error will be logged"); + + + private final String displayName; + private final String description; + + UnmatchedColumnBehavior(final String displayName, final String description) { + this.displayName = displayName; + this.description = description; + } + + @Override + public String getValue() { + return name(); + } + + @Override + public String getDisplayName() { + return displayName; + } + + @Override + public String getDescription() { + return description; + } +} diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/converter/GenericDataConverters.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/converter/GenericDataConverters.java index c8ee7bd171..794d2c5f37 100644 --- a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/converter/GenericDataConverters.java +++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/converter/GenericDataConverters.java @@ -21,9 +21,11 @@ import org.apache.commons.lang3.Validate; import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; +import org.apache.nifi.serialization.SimpleRecordSchema; import org.apache.nifi.serialization.record.DataType; import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.serialization.record.util.DataTypeUtils; @@ -265,9 +267,16 @@ public class GenericDataConverters { for (DataConverter converter : converters) { final Optional recordField = recordSchema.getField(converter.getSourceFieldName()); - final RecordField field = recordField.get(); - // creates a record field accessor for every data converter - getters.put(converter.getTargetFieldName(), createFieldGetter(field.getDataType(), field.getFieldName(), field.isNullable())); + if (recordField.isEmpty()) { + final Types.NestedField missingField = schema.field(converter.getTargetFieldName()); + if (missingField != null) { + getters.put(converter.getTargetFieldName(), createFieldGetter(convertSchemaTypeToDataType(missingField.type()), missingField.name(), missingField.isOptional())); + } + } else { + final RecordField field = recordField.get(); + // creates a record field accessor for every data converter + getters.put(converter.getTargetFieldName(), createFieldGetter(field.getDataType(), field.getFieldName(), field.isNullable())); + } } } @@ -290,4 +299,54 @@ public class GenericDataConverters { return converter.convert((S) getters.get(converter.getTargetFieldName()).getFieldOrNull(record)); } } + + public static DataType convertSchemaTypeToDataType(Type schemaType) { + switch (schemaType.typeId()) { + case BOOLEAN: + return RecordFieldType.BOOLEAN.getDataType(); + case INTEGER: + return RecordFieldType.INT.getDataType(); + case LONG: + return RecordFieldType.LONG.getDataType(); + case FLOAT: + return RecordFieldType.FLOAT.getDataType(); + case DOUBLE: + return RecordFieldType.DOUBLE.getDataType(); + case DATE: + return RecordFieldType.DATE.getDataType(); + case TIME: + return RecordFieldType.TIME.getDataType(); + case TIMESTAMP: + return RecordFieldType.TIMESTAMP.getDataType(); + case STRING: + return RecordFieldType.STRING.getDataType(); + case UUID: + return RecordFieldType.UUID.getDataType(); + case FIXED: + case BINARY: + return RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType()); + case DECIMAL: + return RecordFieldType.DECIMAL.getDataType(); + case STRUCT: + // Build a record type from the struct type + Types.StructType structType = schemaType.asStructType(); + List fields = structType.fields(); + List recordFields = new ArrayList<>(fields.size()); + for (Types.NestedField field : fields) { + DataType dataType = convertSchemaTypeToDataType(field.type()); + recordFields.add(new RecordField(field.name(), dataType, field.isOptional())); + } + RecordSchema recordSchema = new SimpleRecordSchema(recordFields); + return RecordFieldType.RECORD.getRecordDataType(recordSchema); + case LIST: + // Build a list type from the elements + Types.ListType listType = schemaType.asListType(); + return RecordFieldType.ARRAY.getArrayDataType(convertSchemaTypeToDataType(listType.elementType()), listType.isElementOptional()); + case MAP: + // Build a map type from the elements + Types.MapType mapType = schemaType.asMapType(); + return RecordFieldType.MAP.getMapDataType(convertSchemaTypeToDataType(mapType.valueType()), mapType.isValueOptional()); + } + throw new IllegalArgumentException("Invalid or unsupported type: " + schemaType); + } } diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/converter/IcebergRecordConverter.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/converter/IcebergRecordConverter.java index 33049123ef..49ff37f475 100644 --- a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/converter/IcebergRecordConverter.java +++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/converter/IcebergRecordConverter.java @@ -25,6 +25,8 @@ import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.schema.SchemaWithPartnerVisitor; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processors.iceberg.UnmatchedColumnBehavior; import org.apache.nifi.serialization.record.DataType; import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.RecordField; @@ -46,19 +48,26 @@ import java.util.stream.Collectors; public class IcebergRecordConverter { private final DataConverter converter; + public final UnmatchedColumnBehavior unmatchedColumnBehavior; + public ComponentLog logger; + public GenericRecord convert(Record record) { return converter.convert(record); } + @SuppressWarnings("unchecked") - public IcebergRecordConverter(Schema schema, RecordSchema recordSchema, FileFormat fileFormat) { - this.converter = (DataConverter) IcebergSchemaVisitor.visit(schema, new RecordDataType(recordSchema), fileFormat); + public IcebergRecordConverter(Schema schema, RecordSchema recordSchema, FileFormat fileFormat, UnmatchedColumnBehavior unmatchedColumnBehavior, ComponentLog logger) { + this.converter = (DataConverter) IcebergSchemaVisitor.visit(schema, new RecordDataType(recordSchema), fileFormat, unmatchedColumnBehavior, logger); + this.unmatchedColumnBehavior = unmatchedColumnBehavior; + this.logger = logger; } private static class IcebergSchemaVisitor extends SchemaWithPartnerVisitor> { - public static DataConverter visit(Schema schema, RecordDataType recordDataType, FileFormat fileFormat) { - return visit(schema, new RecordTypeWithFieldNameMapper(schema, recordDataType), new IcebergSchemaVisitor(), new IcebergPartnerAccessors(schema, fileFormat)); + public static DataConverter visit(Schema schema, RecordDataType recordDataType, FileFormat fileFormat, UnmatchedColumnBehavior unmatchedColumnBehavior, ComponentLog logger) { + return visit(schema, new RecordTypeWithFieldNameMapper(schema, recordDataType), new IcebergSchemaVisitor(), + new IcebergPartnerAccessors(schema, fileFormat, unmatchedColumnBehavior, logger)); } @Override @@ -123,8 +132,10 @@ public class IcebergRecordConverter { // set NiFi schema field names (sourceFieldName) in the data converters for (DataConverter converter : converters) { final Optional mappedFieldName = recordType.getNameMapping(converter.getTargetFieldName()); - final Optional recordField = recordSchema.getField(mappedFieldName.get()); - converter.setSourceFieldName(recordField.get().getFieldName()); + if (mappedFieldName.isPresent()) { + final Optional recordField = recordSchema.getField(mappedFieldName.get()); + converter.setSourceFieldName(recordField.get().getFieldName()); + } } return new GenericDataConverters.RecordConverter(converters, recordSchema, type); @@ -144,10 +155,14 @@ public class IcebergRecordConverter { public static class IcebergPartnerAccessors implements SchemaWithPartnerVisitor.PartnerAccessors { private final Schema schema; private final FileFormat fileFormat; + private final UnmatchedColumnBehavior unmatchedColumnBehavior; + private final ComponentLog logger; - IcebergPartnerAccessors(Schema schema, FileFormat fileFormat) { + IcebergPartnerAccessors(Schema schema, FileFormat fileFormat, UnmatchedColumnBehavior unmatchedColumnBehavior, ComponentLog logger) { this.schema = schema; this.fileFormat = fileFormat; + this.unmatchedColumnBehavior = unmatchedColumnBehavior; + this.logger = logger; } @Override @@ -156,8 +171,25 @@ public class IcebergRecordConverter { final RecordTypeWithFieldNameMapper recordType = (RecordTypeWithFieldNameMapper) dataType; final Optional mappedFieldName = recordType.getNameMapping(name); - Validate.isTrue(mappedFieldName.isPresent(), String.format("Cannot find field with name '%s' in the record schema", name)); - + if (UnmatchedColumnBehavior.FAIL_UNMATCHED_COLUMN.equals(unmatchedColumnBehavior)) { + Validate.isTrue(mappedFieldName.isPresent(), String.format("Cannot find field with name '%s' in the record schema", name)); + } + if (mappedFieldName.isEmpty()) { + if (UnmatchedColumnBehavior.WARNING_UNMATCHED_COLUMN.equals(unmatchedColumnBehavior)) { + if (logger != null) { + logger.warn("Cannot find field with name '" + name + "' in the record schema, using the target schema for datatype and a null value"); + } + } + // If the field is missing, use the expected type from the schema (converted to a DataType) + final Types.NestedField schemaField = schema.findField(fieldId); + final Type schemaFieldType = schemaField.type(); + if (schemaField.isRequired()) { + // Iceberg requires a non-null value for required fields + throw new IllegalArgumentException("Iceberg requires a non-null value for required fields, field: " + + schemaField.name() + ", type: " + schemaFieldType); + } + return GenericDataConverters.convertSchemaTypeToDataType(schemaFieldType); + } final Optional recordField = recordType.getChildSchema().getField(mappedFieldName.get()); final RecordField field = recordField.get(); diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java index 02bd0b074f..22b1ec5507 100644 --- a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java +++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java @@ -108,6 +108,14 @@ public class PutIceberg extends AbstractIcebergProcessor { .addValidator(StandardValidators.NON_BLANK_VALIDATOR) .build(); + static final PropertyDescriptor UNMATCHED_COLUMN_BEHAVIOR = new PropertyDescriptor.Builder() + .name("unmatched-column-behavior") + .displayName("Unmatched Column Behavior") + .description("If an incoming record does not have a field mapping for all of the database table's columns, this property specifies how to handle the situation.") + .allowableValues(UnmatchedColumnBehavior.class) + .defaultValue(UnmatchedColumnBehavior.FAIL_UNMATCHED_COLUMN.getValue()) + .required(true) + .build(); static final PropertyDescriptor FILE_FORMAT = new PropertyDescriptor.Builder() .name("file-format") .displayName("File Format") @@ -178,6 +186,7 @@ public class PutIceberg extends AbstractIcebergProcessor { CATALOG, CATALOG_NAMESPACE, TABLE_NAME, + UNMATCHED_COLUMN_BEHAVIOR, FILE_FORMAT, MAXIMUM_FILE_SIZE, KERBEROS_USER_SERVICE, @@ -256,8 +265,10 @@ public class PutIceberg extends AbstractIcebergProcessor { final FileFormat format = getFileFormat(table.properties(), fileFormat); final IcebergTaskWriterFactory taskWriterFactory = new IcebergTaskWriterFactory(table, flowFile.getId(), format, maximumFileSize); taskWriter = taskWriterFactory.create(); + final UnmatchedColumnBehavior unmatchedColumnBehavior = + UnmatchedColumnBehavior.valueOf(context.getProperty(UNMATCHED_COLUMN_BEHAVIOR).getValue()); - final IcebergRecordConverter recordConverter = new IcebergRecordConverter(table.schema(), reader.getSchema(), format); + final IcebergRecordConverter recordConverter = new IcebergRecordConverter(table.schema(), reader.getSchema(), format, unmatchedColumnBehavior, getLogger()); Record record; while ((record = reader.nextRecord()) != null) { @@ -353,5 +364,4 @@ public class PutIceberg extends AbstractIcebergProcessor { .retry(3) .run(file -> table.io().deleteFile(file.path().toString())); } - } diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestDataFileActions.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestDataFileActions.java index 4e535c3f8a..cc45705eb3 100644 --- a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestDataFileActions.java +++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestDataFileActions.java @@ -32,6 +32,8 @@ import org.apache.iceberg.io.TaskWriter; import org.apache.iceberg.io.WriteResult; import org.apache.iceberg.types.Types; import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.mock.MockComponentLogger; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processors.iceberg.catalog.IcebergCatalogFactory; import org.apache.nifi.processors.iceberg.catalog.TestHadoopCatalogService; @@ -79,10 +81,12 @@ public class TestDataFileActions { ); private PutIceberg icebergProcessor; + private ComponentLog logger; @BeforeEach public void setUp() { icebergProcessor = new PutIceberg(); + logger = new MockComponentLogger(); } @DisabledOnOs(WINDOWS) @@ -103,7 +107,7 @@ public class TestDataFileActions { IcebergTaskWriterFactory taskWriterFactory = new IcebergTaskWriterFactory(table, new Random().nextLong(), FileFormat.PARQUET, null); TaskWriter taskWriter = taskWriterFactory.create(); - IcebergRecordConverter recordConverter = new IcebergRecordConverter(table.schema(), abortSchema, FileFormat.PARQUET); + IcebergRecordConverter recordConverter = new IcebergRecordConverter(table.schema(), abortSchema, FileFormat.PARQUET, UnmatchedColumnBehavior.IGNORE_UNMATCHED_COLUMN, logger); for (MapRecord record : recordList) { taskWriter.write(recordConverter.convert(record)); diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestIcebergRecordConverter.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestIcebergRecordConverter.java index baf220fea2..c064c723f0 100644 --- a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestIcebergRecordConverter.java +++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestIcebergRecordConverter.java @@ -37,6 +37,8 @@ import org.apache.iceberg.orc.ORC; import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.Types; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.mock.MockComponentLogger; import org.apache.nifi.processors.iceberg.converter.ArrayElementGetter; import org.apache.nifi.processors.iceberg.converter.IcebergRecordConverter; import org.apache.nifi.processors.iceberg.converter.RecordFieldGetter; @@ -57,6 +59,7 @@ import org.junit.jupiter.api.condition.DisabledOnOs; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; + import java.io.File; import java.io.IOException; import java.math.BigDecimal; @@ -92,9 +95,12 @@ public class TestIcebergRecordConverter { private OutputFile tempFile; + private ComponentLog logger; + @BeforeEach public void setUp() throws Exception { tempFile = Files.localOutput(createTempFile("test", null)); + logger = new MockComponentLogger(); } @AfterEach @@ -145,6 +151,24 @@ public class TestIcebergRecordConverter { Types.NestedField.optional(14, "choice", Types.IntegerType.get()) ); + private static final Schema PRIMITIVES_SCHEMA_WITH_REQUIRED_FIELDS = new Schema( + Types.NestedField.optional(0, "string", Types.StringType.get()), + Types.NestedField.required(1, "integer", Types.IntegerType.get()), + Types.NestedField.required(2, "float", Types.FloatType.get()), + Types.NestedField.required(3, "long", Types.LongType.get()), + Types.NestedField.optional(4, "double", Types.DoubleType.get()), + Types.NestedField.optional(5, "decimal", Types.DecimalType.of(10, 2)), + Types.NestedField.optional(6, "boolean", Types.BooleanType.get()), + Types.NestedField.optional(7, "fixed", Types.FixedType.ofLength(5)), + Types.NestedField.optional(8, "binary", Types.BinaryType.get()), + Types.NestedField.optional(9, "date", Types.DateType.get()), + Types.NestedField.optional(10, "time", Types.TimeType.get()), + Types.NestedField.optional(11, "timestamp", Types.TimestampType.withZone()), + Types.NestedField.optional(12, "timestampTz", Types.TimestampType.withoutZone()), + Types.NestedField.optional(13, "uuid", Types.UUIDType.get()), + Types.NestedField.optional(14, "choice", Types.IntegerType.get()) + ); + private static final Schema COMPATIBLE_PRIMITIVES_SCHEMA = new Schema( Types.NestedField.optional(0, "string", Types.StringType.get()), Types.NestedField.optional(1, "integer", Types.IntegerType.get()), @@ -240,6 +264,24 @@ public class TestIcebergRecordConverter { return new SimpleRecordSchema(fields); } + private static RecordSchema getPrimitivesSchemaMissingFields() { + List fields = new ArrayList<>(); + fields.add(new RecordField("string", RecordFieldType.STRING.getDataType())); + fields.add(new RecordField("double", RecordFieldType.DOUBLE.getDataType())); + fields.add(new RecordField("decimal", RecordFieldType.DECIMAL.getDecimalDataType(10, 2))); + fields.add(new RecordField("boolean", RecordFieldType.BOOLEAN.getDataType())); + fields.add(new RecordField("fixed", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType()))); + fields.add(new RecordField("binary", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType()))); + fields.add(new RecordField("date", RecordFieldType.DATE.getDataType())); + fields.add(new RecordField("time", RecordFieldType.TIME.getDataType())); + fields.add(new RecordField("timestamp", RecordFieldType.TIMESTAMP.getDataType())); + fields.add(new RecordField("timestampTz", RecordFieldType.TIMESTAMP.getDataType())); + fields.add(new RecordField("uuid", RecordFieldType.UUID.getDataType())); + fields.add(new RecordField("choice", RecordFieldType.CHOICE.getChoiceDataType(RecordFieldType.STRING.getDataType(), RecordFieldType.INT.getDataType()))); + + return new SimpleRecordSchema(fields); + } + private static RecordSchema getPrimitivesAsCompatiblesSchema() { List fields = new ArrayList<>(); fields.add(new RecordField("string", RecordFieldType.INT.getDataType())); @@ -370,6 +412,29 @@ public class TestIcebergRecordConverter { return new MapRecord(getPrimitivesSchema(), values); } + private static Record setupPrimitivesTestRecordMissingFields() { + LocalDate localDate = LocalDate.of(2017, 4, 4); + LocalTime localTime = LocalTime.of(14, 20, 33); + LocalDateTime localDateTime = LocalDateTime.of(2017, 4, 4, 14, 20, 33, 789000000); + OffsetDateTime offsetDateTime = OffsetDateTime.of(localDateTime, ZoneOffset.ofHours(-5)); + + Map values = new HashMap<>(); + values.put("string", "Test String"); + values.put("double", 3.14159D); + values.put("decimal", new BigDecimal("12345678.12")); + values.put("boolean", true); + values.put("fixed", "hello".getBytes()); + values.put("binary", "hello".getBytes()); + values.put("date", localDate); + values.put("time", Time.valueOf(localTime)); + values.put("timestamp", Timestamp.from(offsetDateTime.toInstant())); + values.put("timestampTz", Timestamp.valueOf(localDateTime)); + values.put("uuid", UUID.fromString("0000-00-00-00-000000")); + values.put("choice", "10"); + + return new MapRecord(getPrimitivesSchemaMissingFields(), values); + } + private static Record setupCompatiblePrimitivesTestRecord() { Map values = new HashMap<>(); @@ -439,7 +504,7 @@ public class TestIcebergRecordConverter { RecordSchema nifiSchema = getPrimitivesSchema(); Record record = setupPrimitivesTestRecord(); - IcebergRecordConverter recordConverter = new IcebergRecordConverter(PRIMITIVES_SCHEMA, nifiSchema, format); + IcebergRecordConverter recordConverter = new IcebergRecordConverter(PRIMITIVES_SCHEMA, nifiSchema, format, UnmatchedColumnBehavior.IGNORE_UNMATCHED_COLUMN, logger); GenericRecord genericRecord = recordConverter.convert(record); writeTo(format, PRIMITIVES_SCHEMA, genericRecord, tempFile); @@ -472,6 +537,54 @@ public class TestIcebergRecordConverter { } else { assertEquals(UUID.fromString("0000-00-00-00-000000"), resultRecord.get(13, UUID.class)); } + } + + @DisabledOnOs(WINDOWS) + @ParameterizedTest + @EnumSource(value = FileFormat.class, names = {"AVRO", "ORC", "PARQUET"}) + public void testPrimitivesIgnoreMissingFields(FileFormat format) throws IOException { + RecordSchema nifiSchema = getPrimitivesSchemaMissingFields(); + Record record = setupPrimitivesTestRecordMissingFields(); + MockComponentLogger mockComponentLogger = new MockComponentLogger(); + + IcebergRecordConverter recordConverter = new IcebergRecordConverter(PRIMITIVES_SCHEMA, nifiSchema, format, UnmatchedColumnBehavior.IGNORE_UNMATCHED_COLUMN, mockComponentLogger); + GenericRecord genericRecord = recordConverter.convert(record); + + writeTo(format, PRIMITIVES_SCHEMA, genericRecord, tempFile); + + List results = readFrom(format, PRIMITIVES_SCHEMA, tempFile.toInputFile()); + + assertEquals(results.size(), 1); + GenericRecord resultRecord = results.get(0); + + LocalDateTime localDateTime = LocalDateTime.of(2017, 4, 4, 14, 20, 33, 789000000); + OffsetDateTime offsetDateTime = OffsetDateTime.of(localDateTime, ZoneOffset.ofHours(-5)); + + assertEquals("Test String", resultRecord.get(0, String.class)); + assertNull(resultRecord.get(1, Integer.class)); + assertNull(resultRecord.get(2, Float.class)); + assertNull(resultRecord.get(3, Long.class)); + assertEquals(Double.valueOf(3.14159D), resultRecord.get(4, Double.class)); + assertEquals(new BigDecimal("12345678.12"), resultRecord.get(5, BigDecimal.class)); + assertEquals(Boolean.TRUE, resultRecord.get(6, Boolean.class)); + assertArrayEquals(new byte[]{104, 101, 108, 108, 111}, resultRecord.get(7, byte[].class)); + assertArrayEquals(new byte[]{104, 101, 108, 108, 111}, resultRecord.get(8, ByteBuffer.class).array()); + assertEquals(LocalDate.of(2017, 4, 4), resultRecord.get(9, LocalDate.class)); + assertEquals(LocalTime.of(14, 20, 33), resultRecord.get(10, LocalTime.class)); + assertEquals(offsetDateTime.withOffsetSameInstant(ZoneOffset.UTC), resultRecord.get(11, OffsetDateTime.class)); + assertEquals(LocalDateTime.of(2017, 4, 4, 14, 20, 33, 789000000), resultRecord.get(12, LocalDateTime.class)); + assertEquals(Integer.valueOf(10), resultRecord.get(14, Integer.class)); + + if (format.equals(FileFormat.PARQUET)) { + // Parquet uses a conversion to the byte values of numeric characters such as "0" -> byte value 0 + UUID uuid = UUID.fromString("0000-00-00-00-000000"); + ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[16]); + byteBuffer.putLong(uuid.getMostSignificantBits()); + byteBuffer.putLong(uuid.getLeastSignificantBits()); + assertArrayEquals(byteBuffer.array(), resultRecord.get(13, byte[].class)); + } else { + assertEquals(UUID.fromString("0000-00-00-00-000000"), resultRecord.get(13, UUID.class)); + } // Test null values for (String fieldName : record.getRawFieldNames()) { @@ -504,11 +617,81 @@ public class TestIcebergRecordConverter { @DisabledOnOs(WINDOWS) @ParameterizedTest @EnumSource(value = FileFormat.class, names = {"AVRO", "ORC", "PARQUET"}) - public void testCompatiblePrimitives(FileFormat format) throws IOException { + public void testPrimitivesMissingRequiredFields(FileFormat format) { + RecordSchema nifiSchema = getPrimitivesSchemaMissingFields(); + MockComponentLogger mockComponentLogger = new MockComponentLogger(); + + assertThrows(IllegalArgumentException.class, + () -> new IcebergRecordConverter(PRIMITIVES_SCHEMA_WITH_REQUIRED_FIELDS, nifiSchema, format, UnmatchedColumnBehavior.IGNORE_UNMATCHED_COLUMN, mockComponentLogger)); + } + + @DisabledOnOs(WINDOWS) + @ParameterizedTest + @EnumSource(value = FileFormat.class, names = {"AVRO", "ORC", "PARQUET"}) + public void testPrimitivesWarnMissingFields(FileFormat format) throws IOException { + RecordSchema nifiSchema = getPrimitivesSchemaMissingFields(); + Record record = setupPrimitivesTestRecordMissingFields(); + MockComponentLogger mockComponentLogger = new MockComponentLogger(); + + IcebergRecordConverter recordConverter = new IcebergRecordConverter(PRIMITIVES_SCHEMA, nifiSchema, format, UnmatchedColumnBehavior.WARNING_UNMATCHED_COLUMN, mockComponentLogger); + GenericRecord genericRecord = recordConverter.convert(record); + + writeTo(format, PRIMITIVES_SCHEMA, genericRecord, tempFile); + + List results = readFrom(format, PRIMITIVES_SCHEMA, tempFile.toInputFile()); + + assertEquals(results.size(), 1); + GenericRecord resultRecord = results.get(0); + + LocalDateTime localDateTime = LocalDateTime.of(2017, 4, 4, 14, 20, 33, 789000000); + OffsetDateTime offsetDateTime = OffsetDateTime.of(localDateTime, ZoneOffset.ofHours(-5)); + + assertEquals("Test String", resultRecord.get(0, String.class)); + assertNull(resultRecord.get(1, Integer.class)); + assertNull(resultRecord.get(2, Float.class)); + assertNull(resultRecord.get(3, Long.class)); + assertEquals(Double.valueOf(3.14159D), resultRecord.get(4, Double.class)); + assertEquals(new BigDecimal("12345678.12"), resultRecord.get(5, BigDecimal.class)); + assertEquals(Boolean.TRUE, resultRecord.get(6, Boolean.class)); + assertArrayEquals(new byte[]{104, 101, 108, 108, 111}, resultRecord.get(7, byte[].class)); + assertArrayEquals(new byte[]{104, 101, 108, 108, 111}, resultRecord.get(8, ByteBuffer.class).array()); + assertEquals(LocalDate.of(2017, 4, 4), resultRecord.get(9, LocalDate.class)); + assertEquals(LocalTime.of(14, 20, 33), resultRecord.get(10, LocalTime.class)); + assertEquals(offsetDateTime.withOffsetSameInstant(ZoneOffset.UTC), resultRecord.get(11, OffsetDateTime.class)); + assertEquals(LocalDateTime.of(2017, 4, 4, 14, 20, 33, 789000000), resultRecord.get(12, LocalDateTime.class)); + assertEquals(Integer.valueOf(10), resultRecord.get(14, Integer.class)); + + if (format.equals(FileFormat.PARQUET)) { + // Parquet uses a conversion to the byte values of numeric characters such as "0" -> byte value 0 + UUID uuid = UUID.fromString("0000-00-00-00-000000"); + ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[16]); + byteBuffer.putLong(uuid.getMostSignificantBits()); + byteBuffer.putLong(uuid.getLeastSignificantBits()); + assertArrayEquals(byteBuffer.array(), resultRecord.get(13, byte[].class)); + } else { + assertEquals(UUID.fromString("0000-00-00-00-000000"), resultRecord.get(13, UUID.class)); + } + } + + @DisabledOnOs(WINDOWS) + @ParameterizedTest + @EnumSource(value = FileFormat.class, names = {"AVRO", "ORC", "PARQUET"}) + public void testPrimitivesFailMissingFields(FileFormat format) throws IOException { + RecordSchema nifiSchema = getPrimitivesSchemaMissingFields(); + MockComponentLogger mockComponentLogger = new MockComponentLogger(); + + assertThrows(IllegalArgumentException.class, + () -> new IcebergRecordConverter(PRIMITIVES_SCHEMA, nifiSchema, format, UnmatchedColumnBehavior.FAIL_UNMATCHED_COLUMN, mockComponentLogger)); + } + + @DisabledOnOs(WINDOWS) + @Test + public void testCompatiblePrimitives() throws IOException { RecordSchema nifiSchema = getPrimitivesAsCompatiblesSchema(); Record record = setupCompatiblePrimitivesTestRecord(); + final FileFormat format = PARQUET; - IcebergRecordConverter recordConverter = new IcebergRecordConverter(COMPATIBLE_PRIMITIVES_SCHEMA, nifiSchema, format); + IcebergRecordConverter recordConverter = new IcebergRecordConverter(COMPATIBLE_PRIMITIVES_SCHEMA, nifiSchema, format, UnmatchedColumnBehavior.FAIL_UNMATCHED_COLUMN, logger); GenericRecord genericRecord = recordConverter.convert(record); writeTo(format, COMPATIBLE_PRIMITIVES_SCHEMA, genericRecord, tempFile); @@ -536,21 +719,17 @@ public class TestIcebergRecordConverter { assertEquals(expectedLocalDateTimestamp, resultRecord.get(11, LocalDateTime.class)); assertEquals(Integer.valueOf(10), resultRecord.get(13, Integer.class)); - if (format.equals(PARQUET)) { - assertArrayEquals(new byte[]{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, resultRecord.get(12, byte[].class)); - } else { - assertEquals(UUID.fromString("0000-00-00-00-000000"), resultRecord.get(12, UUID.class)); - } + assertArrayEquals(new byte[]{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, resultRecord.get(12, byte[].class)); } @DisabledOnOs(WINDOWS) - @ParameterizedTest - @EnumSource(value = FileFormat.class, names = {"AVRO", "ORC", "PARQUET"}) - public void testStruct(FileFormat format) throws IOException { + @Test + public void testStruct() throws IOException { RecordSchema nifiSchema = getStructSchema(); Record record = setupStructTestRecord(); + final FileFormat format = FileFormat.ORC; - IcebergRecordConverter recordConverter = new IcebergRecordConverter(STRUCT_SCHEMA, nifiSchema, format); + IcebergRecordConverter recordConverter = new IcebergRecordConverter(STRUCT_SCHEMA, nifiSchema, format, UnmatchedColumnBehavior.IGNORE_UNMATCHED_COLUMN, logger); GenericRecord genericRecord = recordConverter.convert(record); writeTo(format, STRUCT_SCHEMA, genericRecord, tempFile); @@ -574,13 +753,13 @@ public class TestIcebergRecordConverter { } @DisabledOnOs(WINDOWS) - @ParameterizedTest - @EnumSource(value = FileFormat.class, names = {"AVRO", "ORC", "PARQUET"}) - public void testList(FileFormat format) throws IOException { + @Test + public void testList() throws IOException { RecordSchema nifiSchema = getListSchema(); Record record = setupListTestRecord(); + final FileFormat format = FileFormat.AVRO; - IcebergRecordConverter recordConverter = new IcebergRecordConverter(LIST_SCHEMA, nifiSchema, format); + IcebergRecordConverter recordConverter = new IcebergRecordConverter(LIST_SCHEMA, nifiSchema, format, UnmatchedColumnBehavior.IGNORE_UNMATCHED_COLUMN, logger); GenericRecord genericRecord = recordConverter.convert(record); writeTo(format, LIST_SCHEMA, genericRecord, tempFile); @@ -593,7 +772,7 @@ public class TestIcebergRecordConverter { assertEquals(1, resultRecord.size()); assertInstanceOf(List.class, resultRecord.get(0)); - List nestedList = resultRecord.get(0, List.class); + List nestedList = resultRecord.get(0, List.class); assertEquals(2, nestedList.size()); assertInstanceOf(List.class, nestedList.get(0)); @@ -604,13 +783,13 @@ public class TestIcebergRecordConverter { } @DisabledOnOs(WINDOWS) - @ParameterizedTest - @EnumSource(value = FileFormat.class, names = {"AVRO", "ORC", "PARQUET"}) - public void testMap(FileFormat format) throws IOException { + @Test + public void testMap() throws IOException { RecordSchema nifiSchema = getMapSchema(); Record record = setupMapTestRecord(); + final FileFormat format = PARQUET; - IcebergRecordConverter recordConverter = new IcebergRecordConverter(MAP_SCHEMA, nifiSchema, format); + IcebergRecordConverter recordConverter = new IcebergRecordConverter(MAP_SCHEMA, nifiSchema, format, UnmatchedColumnBehavior.IGNORE_UNMATCHED_COLUMN, logger); GenericRecord genericRecord = recordConverter.convert(record); writeTo(format, MAP_SCHEMA, genericRecord, tempFile); @@ -636,20 +815,21 @@ public class TestIcebergRecordConverter { @ParameterizedTest @EnumSource(value = FileFormat.class, names = {"AVRO", "ORC", "PARQUET"}) public void testSchemaMismatch(FileFormat format) { - RecordSchema nifiSchema = getPrimitivesSchema(); + RecordSchema nifiSchema = getPrimitivesSchemaMissingFields(); - IllegalArgumentException e = assertThrows(IllegalArgumentException.class, () -> new IcebergRecordConverter(CASE_INSENSITIVE_SCHEMA, nifiSchema, format)); - assertTrue(e.getMessage().contains("Cannot find field with name 'FIELD1' in the record schema"), e.getMessage()); + IllegalArgumentException e = assertThrows(IllegalArgumentException.class, + () -> new IcebergRecordConverter(PRIMITIVES_SCHEMA_WITH_REQUIRED_FIELDS, nifiSchema, format, UnmatchedColumnBehavior.IGNORE_UNMATCHED_COLUMN, logger)); + assertTrue(e.getMessage().contains("Iceberg requires a non-null value for required fields"), e.getMessage()); } @DisabledOnOs(WINDOWS) - @ParameterizedTest - @EnumSource(value = FileFormat.class, names = {"AVRO", "ORC", "PARQUET"}) - public void testCaseInsensitiveFieldMapping(FileFormat format) throws IOException { + @Test + public void testCaseInsensitiveFieldMapping() throws IOException { RecordSchema nifiSchema = getCaseInsensitiveSchema(); Record record = setupCaseInsensitiveTestRecord(); + final FileFormat format = FileFormat.AVRO; - IcebergRecordConverter recordConverter = new IcebergRecordConverter(CASE_INSENSITIVE_SCHEMA, nifiSchema, format); + IcebergRecordConverter recordConverter = new IcebergRecordConverter(CASE_INSENSITIVE_SCHEMA, nifiSchema, format, UnmatchedColumnBehavior.IGNORE_UNMATCHED_COLUMN, logger); GenericRecord genericRecord = recordConverter.convert(record); writeTo(format, CASE_INSENSITIVE_SCHEMA, genericRecord, tempFile); @@ -667,13 +847,13 @@ public class TestIcebergRecordConverter { } @DisabledOnOs(WINDOWS) - @ParameterizedTest - @EnumSource(value = FileFormat.class, names = {"AVRO", "ORC", "PARQUET"}) - public void testUnorderedFieldMapping(FileFormat format) throws IOException { + @Test + public void testUnorderedFieldMapping() throws IOException { RecordSchema nifiSchema = getUnorderedSchema(); Record record = setupUnorderedTestRecord(); + final FileFormat format = PARQUET; - IcebergRecordConverter recordConverter = new IcebergRecordConverter(UNORDERED_SCHEMA, nifiSchema, format); + IcebergRecordConverter recordConverter = new IcebergRecordConverter(UNORDERED_SCHEMA, nifiSchema, format, UnmatchedColumnBehavior.IGNORE_UNMATCHED_COLUMN, logger); GenericRecord genericRecord = recordConverter.convert(record); writeTo(format, UNORDERED_SCHEMA, genericRecord, tempFile); @@ -698,7 +878,7 @@ public class TestIcebergRecordConverter { assertEquals("value5", resultRecord.get(2, String.class)); assertInstanceOf(Map.class, resultRecord.get(3)); - Map map = resultRecord.get(3, Map.class); + Map map = resultRecord.get(3, Map.class); assertEquals("map value1", map.get("key1")); assertEquals("map value2", map.get("key2")); } diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHadoopCatalog.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHadoopCatalog.java index ff8f5a9a3e..49ce684302 100644 --- a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHadoopCatalog.java +++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHadoopCatalog.java @@ -19,6 +19,7 @@ package org.apache.nifi.processors.iceberg; import org.apache.avro.Schema; import org.apache.commons.io.IOUtils; +import org.apache.iceberg.FileFormat; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; @@ -38,9 +39,8 @@ import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.condition.DisabledOnOs; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; import java.io.IOException; import java.nio.charset.StandardCharsets; @@ -102,14 +102,14 @@ public class TestPutIcebergWithHadoopCatalog { runner.setProperty(PutIceberg.RECORD_READER, "mock-reader-factory"); } - private void initCatalog(PartitionSpec spec, String fileFormat) throws InitializationException, IOException { + private void initCatalog(PartitionSpec spec, FileFormat fileFormat) throws InitializationException, IOException { TestHadoopCatalogService catalogService = new TestHadoopCatalogService(); IcebergCatalogFactory catalogFactory = new IcebergCatalogFactory(catalogService); catalog = catalogFactory.create(); Map tableProperties = new HashMap<>(); tableProperties.put(TableProperties.FORMAT_VERSION, "2"); - tableProperties.put(TableProperties.DEFAULT_FILE_FORMAT, fileFormat); + tableProperties.put(TableProperties.DEFAULT_FILE_FORMAT, fileFormat.name()); catalog.createTable(TABLE_IDENTIFIER, DATE_SCHEMA, spec, tableProperties); @@ -120,16 +120,15 @@ public class TestPutIcebergWithHadoopCatalog { } @DisabledOnOs(WINDOWS) - @ParameterizedTest - @ValueSource(strings = {"avro", "orc", "parquet"}) - public void onTriggerYearTransform(String fileFormat) throws Exception { + @Test + public void onTriggerYearTransform() throws Exception { PartitionSpec spec = PartitionSpec.builderFor(DATE_SCHEMA) .year("date") .build(); runner = TestRunners.newTestRunner(processor); initRecordReader(); - initCatalog(spec, fileFormat); + initCatalog(spec, FileFormat.PARQUET); runner.setProperty(PutIceberg.CATALOG_NAMESPACE, "default"); runner.setProperty(PutIceberg.TABLE_NAME, "date"); runner.setValidateExpressionUsage(false); @@ -148,16 +147,15 @@ public class TestPutIcebergWithHadoopCatalog { } @DisabledOnOs(WINDOWS) - @ParameterizedTest - @ValueSource(strings = {"avro", "orc", "parquet"}) - public void onTriggerMonthTransform(String fileFormat) throws Exception { + @Test + public void onTriggerMonthTransform() throws Exception { PartitionSpec spec = PartitionSpec.builderFor(DATE_SCHEMA) .month("timestampMicros") .build(); runner = TestRunners.newTestRunner(processor); initRecordReader(); - initCatalog(spec, fileFormat); + initCatalog(spec, FileFormat.ORC); runner.setProperty(PutIceberg.CATALOG_NAMESPACE, "default"); runner.setProperty(PutIceberg.TABLE_NAME, "date"); runner.setValidateExpressionUsage(false); @@ -177,16 +175,15 @@ public class TestPutIcebergWithHadoopCatalog { } @DisabledOnOs(WINDOWS) - @ParameterizedTest - @ValueSource(strings = {"avro", "orc", "parquet"}) - public void onTriggerDayTransform(String fileFormat) throws Exception { + @Test + public void onTriggerDayTransform() throws Exception { PartitionSpec spec = PartitionSpec.builderFor(DATE_SCHEMA) .day("timestampMicros") .build(); runner = TestRunners.newTestRunner(processor); initRecordReader(); - initCatalog(spec, fileFormat); + initCatalog(spec, FileFormat.AVRO); runner.setProperty(PutIceberg.CATALOG_NAMESPACE, "default"); runner.setProperty(PutIceberg.TABLE_NAME, "date"); runner.setValidateExpressionUsage(false); diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHiveCatalog.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHiveCatalog.java index bc159ef470..05d140a829 100644 --- a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHiveCatalog.java +++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHiveCatalog.java @@ -19,6 +19,7 @@ package org.apache.nifi.processors.iceberg; import org.apache.avro.Schema; import org.apache.commons.io.IOUtils; +import org.apache.iceberg.FileFormat; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; @@ -44,10 +45,9 @@ import org.apache.nifi.util.TestRunners; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.condition.DisabledOnOs; import org.junit.jupiter.api.extension.RegisterExtension; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; import java.net.URI; import java.nio.charset.StandardCharsets; @@ -122,10 +122,10 @@ public class TestPutIcebergWithHiveCatalog { runner.setProperty(PutIceberg.RECORD_READER, "mock-reader-factory"); } - private void initCatalog(PartitionSpec spec, String fileFormat) throws InitializationException { + private void initCatalog(PartitionSpec spec, FileFormat fileFormat) throws InitializationException { Map tableProperties = new HashMap<>(); tableProperties.put(TableProperties.FORMAT_VERSION, "2"); - tableProperties.put(TableProperties.DEFAULT_FILE_FORMAT, fileFormat); + tableProperties.put(TableProperties.DEFAULT_FILE_FORMAT, fileFormat.name()); TestHiveCatalogService catalogService = new TestHiveCatalogService.Builder() .withMetastoreUri(metastore.getThriftConnectionUri()) @@ -143,16 +143,15 @@ public class TestPutIcebergWithHiveCatalog { runner.setProperty(PutIceberg.CATALOG, "catalog-service"); } - @ParameterizedTest - @ValueSource(strings = {"avro"}) - public void onTriggerPartitioned(String fileFormat) throws Exception { + @Test + public void onTriggerPartitioned() throws Exception { PartitionSpec spec = PartitionSpec.builderFor(USER_SCHEMA) .bucket("department", 3) .build(); runner = TestRunners.newTestRunner(processor); initRecordReader(); - initCatalog(spec, fileFormat); + initCatalog(spec, FileFormat.AVRO); runner.setProperty(PutIceberg.CATALOG_NAMESPACE, CATALOG_NAME); runner.setProperty(PutIceberg.TABLE_NAME, TABLE_NAME); runner.setValidateExpressionUsage(false); @@ -181,16 +180,15 @@ public class TestPutIcebergWithHiveCatalog { assertProvenanceEvents(); } - @ParameterizedTest - @ValueSource(strings = {"orc"}) - public void onTriggerIdentityPartitioned(String fileFormat) throws Exception { + @Test + public void onTriggerIdentityPartitioned() throws Exception { PartitionSpec spec = PartitionSpec.builderFor(USER_SCHEMA) .identity("department") .build(); runner = TestRunners.newTestRunner(processor); initRecordReader(); - initCatalog(spec, fileFormat); + initCatalog(spec, FileFormat.ORC); runner.setProperty(PutIceberg.CATALOG_NAMESPACE, CATALOG_NAME); runner.setProperty(PutIceberg.TABLE_NAME, TABLE_NAME); runner.setValidateExpressionUsage(false); @@ -219,9 +217,8 @@ public class TestPutIcebergWithHiveCatalog { assertProvenanceEvents(); } - @ParameterizedTest - @ValueSource(strings = {"parquet"}) - public void onTriggerMultiLevelIdentityPartitioned(String fileFormat) throws Exception { + @Test + public void onTriggerMultiLevelIdentityPartitioned() throws Exception { PartitionSpec spec = PartitionSpec.builderFor(USER_SCHEMA) .identity("name") .identity("department") @@ -229,7 +226,7 @@ public class TestPutIcebergWithHiveCatalog { runner = TestRunners.newTestRunner(processor); initRecordReader(); - initCatalog(spec, fileFormat); + initCatalog(spec, FileFormat.PARQUET); runner.setProperty(PutIceberg.CATALOG_NAMESPACE, CATALOG_NAME); runner.setProperty(PutIceberg.TABLE_NAME, TABLE_NAME); runner.setValidateExpressionUsage(false); @@ -262,12 +259,11 @@ public class TestPutIcebergWithHiveCatalog { assertProvenanceEvents(); } - @ParameterizedTest - @ValueSource(strings = {"avro"}) - public void onTriggerUnPartitioned(String fileFormat) throws Exception { + @Test + public void onTriggerUnPartitioned() throws Exception { runner = TestRunners.newTestRunner(processor); initRecordReader(); - initCatalog(PartitionSpec.unpartitioned(), fileFormat); + initCatalog(PartitionSpec.unpartitioned(), FileFormat.AVRO); runner.setProperty(PutIceberg.CATALOG_NAMESPACE, "${catalog.name}"); runner.setProperty(PutIceberg.TABLE_NAME, "${table.name}"); runner.setProperty(PutIceberg.MAXIMUM_FILE_SIZE, "${max.filesize}");