diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/catalog/IcebergCatalogFactory.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/catalog/IcebergCatalogFactory.java index 1b1e058090..83f765df4a 100644 --- a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/catalog/IcebergCatalogFactory.java +++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/catalog/IcebergCatalogFactory.java @@ -41,14 +41,10 @@ public class IcebergCatalogFactory { } public Catalog create() { - switch (catalogService.getCatalogType()) { - case HIVE: - return initHiveCatalog(catalogService); - case HADOOP: - return initHadoopCatalog(catalogService); - default: - throw new IllegalArgumentException("Unknown catalog type: " + catalogService.getCatalogType()); - } + return switch (catalogService.getCatalogType()) { + case HIVE -> initHiveCatalog(catalogService); + case HADOOP -> initHadoopCatalog(catalogService); + }; } private Catalog initHiveCatalog(IcebergCatalogService catalogService) { diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/converter/GenericDataConverters.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/converter/GenericDataConverters.java index 794d2c5f37..0df1a7c2a2 100644 --- a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/converter/GenericDataConverters.java +++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/converter/GenericDataConverters.java @@ -187,8 +187,7 @@ public class GenericDataConverters { if (data == null) { return null; } - if (data instanceof BigDecimal) { - BigDecimal bigDecimal = (BigDecimal) data; + if (data instanceof BigDecimal bigDecimal) { Validate.isTrue(bigDecimal.scale() == scale, "Cannot write value as decimal(%s,%s), wrong scale %s for value: %s", precision, scale, bigDecimal.scale(), data); Validate.isTrue(bigDecimal.precision() <= precision, "Cannot write value as decimal(%s,%s), invalid precision %s for value: %s", precision, scale, bigDecimal.precision(), data); diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java index f94453789f..57bb698e2f 100644 --- a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java +++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java @@ -33,7 +33,6 @@ import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; -import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; @@ -59,8 +58,6 @@ import java.io.InputStream; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; import java.util.List; import java.util.Locale; import java.util.Map; @@ -123,7 +120,7 @@ public class PutIceberg extends AbstractIcebergProcessor { .displayName("Unmatched Column Behavior") .description("If an incoming record does not have a field mapping for all of the database table's columns, this property specifies how to handle the situation.") .allowableValues(UnmatchedColumnBehavior.class) - .defaultValue(UnmatchedColumnBehavior.FAIL_UNMATCHED_COLUMN.getValue()) + .defaultValue(UnmatchedColumnBehavior.FAIL_UNMATCHED_COLUMN) .required(true) .build(); @@ -132,10 +129,7 @@ public class PutIceberg extends AbstractIcebergProcessor { .displayName("File Format") .description("File format to use when writing Iceberg data files." + " If not set, then the 'write.format.default' table property will be used, default value is parquet.") - .allowableValues( - new AllowableValue("AVRO"), - new AllowableValue("PARQUET"), - new AllowableValue("ORC")) + .allowableValues(new FileFormat[]{FileFormat.AVRO, FileFormat.PARQUET, FileFormat.ORC}) .build(); static final PropertyDescriptor MAXIMUM_FILE_SIZE = new PropertyDescriptor.Builder() @@ -192,7 +186,7 @@ public class PutIceberg extends AbstractIcebergProcessor { .description("A FlowFile is routed to this relationship after the data ingestion was successful.") .build(); - private static final List PROPERTIES = Collections.unmodifiableList(Arrays.asList( + private static final List PROPERTIES = List.of( RECORD_READER, CATALOG, CATALOG_NAMESPACE, @@ -205,12 +199,9 @@ public class PutIceberg extends AbstractIcebergProcessor { MINIMUM_COMMIT_WAIT_TIME, MAXIMUM_COMMIT_WAIT_TIME, MAXIMUM_COMMIT_DURATION - )); + ); - public static final Set RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList( - REL_SUCCESS, - REL_FAILURE - ))); + public static final Set RELATIONSHIPS = Set.of(REL_SUCCESS, REL_FAILURE); @Override protected List getSupportedPropertyDescriptors() { @@ -295,8 +286,7 @@ public class PutIceberg extends AbstractIcebergProcessor { final FileFormat format = getFileFormat(table.properties(), fileFormat); final IcebergTaskWriterFactory taskWriterFactory = new IcebergTaskWriterFactory(table, flowFile.getId(), format, maximumFileSize); taskWriter = taskWriterFactory.create(); - final UnmatchedColumnBehavior unmatchedColumnBehavior = - UnmatchedColumnBehavior.valueOf(context.getProperty(UNMATCHED_COLUMN_BEHAVIOR).getValue()); + final UnmatchedColumnBehavior unmatchedColumnBehavior = context.getProperty(UNMATCHED_COLUMN_BEHAVIOR).asDescribedValue(UnmatchedColumnBehavior.class); final IcebergRecordConverter recordConverter = new IcebergRecordConverter(table.schema(), reader.getSchema(), format, unmatchedColumnBehavior, getLogger()); diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestIcebergRecordConverter.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestIcebergRecordConverter.java index c064c723f0..743ce0cbe6 100644 --- a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestIcebergRecordConverter.java +++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestIcebergRecordConverter.java @@ -512,7 +512,7 @@ public class TestIcebergRecordConverter { List results = readFrom(format, PRIMITIVES_SCHEMA, tempFile.toInputFile()); assertEquals(results.size(), 1); - GenericRecord resultRecord = results.get(0); + GenericRecord resultRecord = results.getFirst(); LocalDateTime localDateTime = LocalDateTime.of(2017, 4, 4, 14, 20, 33, 789000000); OffsetDateTime offsetDateTime = OffsetDateTime.of(localDateTime, ZoneOffset.ofHours(-5)); @@ -555,7 +555,7 @@ public class TestIcebergRecordConverter { List results = readFrom(format, PRIMITIVES_SCHEMA, tempFile.toInputFile()); assertEquals(results.size(), 1); - GenericRecord resultRecord = results.get(0); + GenericRecord resultRecord = results.getFirst(); LocalDateTime localDateTime = LocalDateTime.of(2017, 4, 4, 14, 20, 33, 789000000); OffsetDateTime offsetDateTime = OffsetDateTime.of(localDateTime, ZoneOffset.ofHours(-5)); @@ -598,7 +598,7 @@ public class TestIcebergRecordConverter { results = readFrom(format, PRIMITIVES_SCHEMA, tempFile.toInputFile()); assertEquals(results.size(), 1); - resultRecord = results.get(0); + resultRecord = results.getFirst(); assertNull(resultRecord.get(0, String.class)); assertNull(resultRecord.get(1, Integer.class)); assertNull(resultRecord.get(2, Float.class)); @@ -641,7 +641,7 @@ public class TestIcebergRecordConverter { List results = readFrom(format, PRIMITIVES_SCHEMA, tempFile.toInputFile()); assertEquals(results.size(), 1); - GenericRecord resultRecord = results.get(0); + GenericRecord resultRecord = results.getFirst(); LocalDateTime localDateTime = LocalDateTime.of(2017, 4, 4, 14, 20, 33, 789000000); OffsetDateTime offsetDateTime = OffsetDateTime.of(localDateTime, ZoneOffset.ofHours(-5)); @@ -699,7 +699,7 @@ public class TestIcebergRecordConverter { List results = readFrom(format, COMPATIBLE_PRIMITIVES_SCHEMA, tempFile.toInputFile()); assertEquals(results.size(), 1); - GenericRecord resultRecord = results.get(0); + GenericRecord resultRecord = results.getFirst(); LocalDateTime localDateTime = LocalDateTime.of(2017, 4, 4, 14, 20, 33, 789000000); OffsetDateTime offsetDateTime = OffsetDateTime.of(localDateTime, ZoneOffset.ofHours(-5)); @@ -737,8 +737,7 @@ public class TestIcebergRecordConverter { List results = readFrom(format, STRUCT_SCHEMA, tempFile.toInputFile()); assertEquals(1, results.size()); - assertInstanceOf(GenericRecord.class, results.get(0)); - GenericRecord resultRecord = results.get(0); + GenericRecord resultRecord = results.getFirst(); assertEquals(1, resultRecord.size()); assertInstanceOf(GenericRecord.class, resultRecord.get(0)); @@ -767,8 +766,7 @@ public class TestIcebergRecordConverter { List results = readFrom(format, LIST_SCHEMA, tempFile.toInputFile()); assertEquals(1, results.size()); - assertInstanceOf(GenericRecord.class, results.get(0)); - GenericRecord resultRecord = results.get(0); + GenericRecord resultRecord = results.getFirst(); assertEquals(1, resultRecord.size()); assertInstanceOf(List.class, resultRecord.get(0)); @@ -797,8 +795,7 @@ public class TestIcebergRecordConverter { List results = readFrom(format, MAP_SCHEMA, tempFile.toInputFile()); assertEquals(1, results.size()); - assertInstanceOf(GenericRecord.class, results.get(0)); - GenericRecord resultRecord = results.get(0); + GenericRecord resultRecord = results.getFirst(); assertEquals(1, resultRecord.size()); assertInstanceOf(Map.class, resultRecord.get(0)); @@ -837,8 +834,7 @@ public class TestIcebergRecordConverter { List results = readFrom(format, CASE_INSENSITIVE_SCHEMA, tempFile.toInputFile()); assertEquals(1, results.size()); - assertInstanceOf(GenericRecord.class, results.get(0)); - GenericRecord resultRecord = results.get(0); + GenericRecord resultRecord = results.getFirst(); assertEquals("Text1", resultRecord.get(0, String.class)); assertEquals("Text2", resultRecord.get(1, String.class)); @@ -861,8 +857,7 @@ public class TestIcebergRecordConverter { List results = readFrom(format, UNORDERED_SCHEMA, tempFile.toInputFile()); assertEquals(1, results.size()); - assertInstanceOf(GenericRecord.class, results.get(0)); - GenericRecord resultRecord = results.get(0); + GenericRecord resultRecord = results.getFirst(); assertEquals("value1", resultRecord.get(0, String.class)); @@ -926,15 +921,12 @@ public class TestIcebergRecordConverter { } private ArrayList readFrom(FileFormat format, Schema schema, InputFile inputFile) throws IOException { - switch (format) { - case AVRO: - return readFromAvro(schema, inputFile); - case ORC: - return readFromOrc(schema, inputFile); - case PARQUET: - return readFromParquet(schema, inputFile); - } - throw new IOException("Unknown file format: " + format); + return switch (format) { + case AVRO -> readFromAvro(schema, inputFile); + case ORC -> readFromOrc(schema, inputFile); + case PARQUET -> readFromParquet(schema, inputFile); + default -> throw new IOException("Unknown file format: " + format); + }; } private void writeToAvro(Schema schema, GenericRecord record, OutputFile outputFile) throws IOException { diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHadoopCatalog.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHadoopCatalog.java index 49ce684302..484dfc79cc 100644 --- a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHadoopCatalog.java +++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHadoopCatalog.java @@ -138,7 +138,7 @@ public class TestPutIcebergWithHadoopCatalog { Table table = catalog.loadTable(TABLE_IDENTIFIER); runner.assertTransferCount(PutIceberg.REL_SUCCESS, 1); - MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutIceberg.REL_SUCCESS).get(0); + MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutIceberg.REL_SUCCESS).getFirst(); Assertions.assertTrue(table.spec().isPartitioned()); Assertions.assertEquals("4", flowFile.getAttribute(ICEBERG_RECORD_COUNT)); @@ -165,7 +165,7 @@ public class TestPutIcebergWithHadoopCatalog { Table table = catalog.loadTable(TABLE_IDENTIFIER); runner.assertTransferCount(PutIceberg.REL_SUCCESS, 1); - MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutIceberg.REL_SUCCESS).get(0); + MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutIceberg.REL_SUCCESS).getFirst(); Assertions.assertTrue(table.spec().isPartitioned()); Assertions.assertEquals("4", flowFile.getAttribute(ICEBERG_RECORD_COUNT)); @@ -193,7 +193,7 @@ public class TestPutIcebergWithHadoopCatalog { Table table = catalog.loadTable(TABLE_IDENTIFIER); runner.assertTransferCount(PutIceberg.REL_SUCCESS, 1); - MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutIceberg.REL_SUCCESS).get(0); + MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutIceberg.REL_SUCCESS).getFirst(); Assertions.assertTrue(table.spec().isPartitioned()); Assertions.assertEquals("4", flowFile.getAttribute(ICEBERG_RECORD_COUNT)); diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHiveCatalog.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHiveCatalog.java index 0a0cd8ef96..c61925050c 100644 --- a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHiveCatalog.java +++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHiveCatalog.java @@ -169,7 +169,7 @@ public class TestPutIcebergWithHiveCatalog { .build(); runner.assertTransferCount(PutIceberg.REL_SUCCESS, 1); - MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutIceberg.REL_SUCCESS).get(0); + MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutIceberg.REL_SUCCESS).getFirst(); String tableLocation = new URI(table.location()).getPath(); assertTrue(table.spec().isPartitioned()); @@ -206,7 +206,7 @@ public class TestPutIcebergWithHiveCatalog { .build(); runner.assertTransferCount(PutIceberg.REL_SUCCESS, 1); - MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutIceberg.REL_SUCCESS).get(0); + MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutIceberg.REL_SUCCESS).getFirst(); String tableLocation = new URI(table.location()).getPath(); assertTrue(table.spec().isPartitioned()); @@ -244,7 +244,7 @@ public class TestPutIcebergWithHiveCatalog { .build(); runner.assertTransferCount(PutIceberg.REL_SUCCESS, 1); - MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutIceberg.REL_SUCCESS).get(0); + MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutIceberg.REL_SUCCESS).getFirst(); String tableLocation = new URI(table.location()).getPath(); assertTrue(table.spec().isPartitioned()); @@ -286,7 +286,7 @@ public class TestPutIcebergWithHiveCatalog { .build(); runner.assertTransferCount(PutIceberg.REL_SUCCESS, 1); - MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutIceberg.REL_SUCCESS).get(0); + MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutIceberg.REL_SUCCESS).getFirst(); assertTrue(table.spec().isUnpartitioned()); assertEquals("4", flowFile.getAttribute(ICEBERG_RECORD_COUNT)); @@ -299,7 +299,7 @@ public class TestPutIcebergWithHiveCatalog { private void assertProvenanceEvents() { final List provenanceEvents = runner.getProvenanceEvents(); assertEquals(1, provenanceEvents.size()); - final ProvenanceEventRecord sendEvent = provenanceEvents.get(0); + final ProvenanceEventRecord sendEvent = provenanceEvents.getFirst(); assertEquals(ProvenanceEventType.SEND, sendEvent.getEventType()); assertTrue(sendEvent.getTransitUri().endsWith(CATALOG_NAME + ".db/" + TABLE_NAME)); } diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/util/IcebergTestUtils.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/util/IcebergTestUtils.java index ceb2a486c9..69faa47047 100644 --- a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/util/IcebergTestUtils.java +++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/util/IcebergTestUtils.java @@ -34,7 +34,6 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; import java.util.List; -import java.util.stream.Collectors; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -88,7 +87,7 @@ public class IcebergTestUtils { List dataFiles = Files.walk(Paths.get(tableLocation + "/data")) .filter(Files::isRegularFile) .filter(path -> !path.getFileName().toString().startsWith(".")) - .collect(Collectors.toList()); + .toList(); assertEquals(numberOfDataFiles, dataFiles.size()); }