NIFI-12545 Updated nifi-iceberg-bundle using current API methods

This closes #8186

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
EndzeitBegins 2023-12-23 19:50:01 +01:00 committed by exceptionfactory
parent 6bca79cb37
commit 499b63e544
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
7 changed files with 36 additions and 60 deletions

View File

@ -41,14 +41,10 @@ public class IcebergCatalogFactory {
} }
public Catalog create() { public Catalog create() {
switch (catalogService.getCatalogType()) { return switch (catalogService.getCatalogType()) {
case HIVE: case HIVE -> initHiveCatalog(catalogService);
return initHiveCatalog(catalogService); case HADOOP -> initHadoopCatalog(catalogService);
case HADOOP: };
return initHadoopCatalog(catalogService);
default:
throw new IllegalArgumentException("Unknown catalog type: " + catalogService.getCatalogType());
}
} }
private Catalog initHiveCatalog(IcebergCatalogService catalogService) { private Catalog initHiveCatalog(IcebergCatalogService catalogService) {

View File

@ -187,8 +187,7 @@ public class GenericDataConverters {
if (data == null) { if (data == null) {
return null; return null;
} }
if (data instanceof BigDecimal) { if (data instanceof BigDecimal bigDecimal) {
BigDecimal bigDecimal = (BigDecimal) data;
Validate.isTrue(bigDecimal.scale() == scale, "Cannot write value as decimal(%s,%s), wrong scale %s for value: %s", precision, scale, bigDecimal.scale(), data); Validate.isTrue(bigDecimal.scale() == scale, "Cannot write value as decimal(%s,%s), wrong scale %s for value: %s", precision, scale, bigDecimal.scale(), data);
Validate.isTrue(bigDecimal.precision() <= precision, "Cannot write value as decimal(%s,%s), invalid precision %s for value: %s", Validate.isTrue(bigDecimal.precision() <= precision, "Cannot write value as decimal(%s,%s), invalid precision %s for value: %s",
precision, scale, bigDecimal.precision(), data); precision, scale, bigDecimal.precision(), data);

View File

@ -33,7 +33,6 @@ import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.ValidationResult;
@ -59,8 +58,6 @@ import java.io.InputStream;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Locale; import java.util.Locale;
import java.util.Map; import java.util.Map;
@ -123,7 +120,7 @@ public class PutIceberg extends AbstractIcebergProcessor {
.displayName("Unmatched Column Behavior") .displayName("Unmatched Column Behavior")
.description("If an incoming record does not have a field mapping for all of the database table's columns, this property specifies how to handle the situation.") .description("If an incoming record does not have a field mapping for all of the database table's columns, this property specifies how to handle the situation.")
.allowableValues(UnmatchedColumnBehavior.class) .allowableValues(UnmatchedColumnBehavior.class)
.defaultValue(UnmatchedColumnBehavior.FAIL_UNMATCHED_COLUMN.getValue()) .defaultValue(UnmatchedColumnBehavior.FAIL_UNMATCHED_COLUMN)
.required(true) .required(true)
.build(); .build();
@ -132,10 +129,7 @@ public class PutIceberg extends AbstractIcebergProcessor {
.displayName("File Format") .displayName("File Format")
.description("File format to use when writing Iceberg data files." + .description("File format to use when writing Iceberg data files." +
" If not set, then the 'write.format.default' table property will be used, default value is parquet.") " If not set, then the 'write.format.default' table property will be used, default value is parquet.")
.allowableValues( .allowableValues(new FileFormat[]{FileFormat.AVRO, FileFormat.PARQUET, FileFormat.ORC})
new AllowableValue("AVRO"),
new AllowableValue("PARQUET"),
new AllowableValue("ORC"))
.build(); .build();
static final PropertyDescriptor MAXIMUM_FILE_SIZE = new PropertyDescriptor.Builder() static final PropertyDescriptor MAXIMUM_FILE_SIZE = new PropertyDescriptor.Builder()
@ -192,7 +186,7 @@ public class PutIceberg extends AbstractIcebergProcessor {
.description("A FlowFile is routed to this relationship after the data ingestion was successful.") .description("A FlowFile is routed to this relationship after the data ingestion was successful.")
.build(); .build();
private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList( private static final List<PropertyDescriptor> PROPERTIES = List.of(
RECORD_READER, RECORD_READER,
CATALOG, CATALOG,
CATALOG_NAMESPACE, CATALOG_NAMESPACE,
@ -205,12 +199,9 @@ public class PutIceberg extends AbstractIcebergProcessor {
MINIMUM_COMMIT_WAIT_TIME, MINIMUM_COMMIT_WAIT_TIME,
MAXIMUM_COMMIT_WAIT_TIME, MAXIMUM_COMMIT_WAIT_TIME,
MAXIMUM_COMMIT_DURATION MAXIMUM_COMMIT_DURATION
)); );
public static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList( public static final Set<Relationship> RELATIONSHIPS = Set.of(REL_SUCCESS, REL_FAILURE);
REL_SUCCESS,
REL_FAILURE
)));
@Override @Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
@ -295,8 +286,7 @@ public class PutIceberg extends AbstractIcebergProcessor {
final FileFormat format = getFileFormat(table.properties(), fileFormat); final FileFormat format = getFileFormat(table.properties(), fileFormat);
final IcebergTaskWriterFactory taskWriterFactory = new IcebergTaskWriterFactory(table, flowFile.getId(), format, maximumFileSize); final IcebergTaskWriterFactory taskWriterFactory = new IcebergTaskWriterFactory(table, flowFile.getId(), format, maximumFileSize);
taskWriter = taskWriterFactory.create(); taskWriter = taskWriterFactory.create();
final UnmatchedColumnBehavior unmatchedColumnBehavior = final UnmatchedColumnBehavior unmatchedColumnBehavior = context.getProperty(UNMATCHED_COLUMN_BEHAVIOR).asDescribedValue(UnmatchedColumnBehavior.class);
UnmatchedColumnBehavior.valueOf(context.getProperty(UNMATCHED_COLUMN_BEHAVIOR).getValue());
final IcebergRecordConverter recordConverter = new IcebergRecordConverter(table.schema(), reader.getSchema(), format, unmatchedColumnBehavior, getLogger()); final IcebergRecordConverter recordConverter = new IcebergRecordConverter(table.schema(), reader.getSchema(), format, unmatchedColumnBehavior, getLogger());

View File

@ -512,7 +512,7 @@ public class TestIcebergRecordConverter {
List<GenericRecord> results = readFrom(format, PRIMITIVES_SCHEMA, tempFile.toInputFile()); List<GenericRecord> results = readFrom(format, PRIMITIVES_SCHEMA, tempFile.toInputFile());
assertEquals(results.size(), 1); assertEquals(results.size(), 1);
GenericRecord resultRecord = results.get(0); GenericRecord resultRecord = results.getFirst();
LocalDateTime localDateTime = LocalDateTime.of(2017, 4, 4, 14, 20, 33, 789000000); LocalDateTime localDateTime = LocalDateTime.of(2017, 4, 4, 14, 20, 33, 789000000);
OffsetDateTime offsetDateTime = OffsetDateTime.of(localDateTime, ZoneOffset.ofHours(-5)); OffsetDateTime offsetDateTime = OffsetDateTime.of(localDateTime, ZoneOffset.ofHours(-5));
@ -555,7 +555,7 @@ public class TestIcebergRecordConverter {
List<GenericRecord> results = readFrom(format, PRIMITIVES_SCHEMA, tempFile.toInputFile()); List<GenericRecord> results = readFrom(format, PRIMITIVES_SCHEMA, tempFile.toInputFile());
assertEquals(results.size(), 1); assertEquals(results.size(), 1);
GenericRecord resultRecord = results.get(0); GenericRecord resultRecord = results.getFirst();
LocalDateTime localDateTime = LocalDateTime.of(2017, 4, 4, 14, 20, 33, 789000000); LocalDateTime localDateTime = LocalDateTime.of(2017, 4, 4, 14, 20, 33, 789000000);
OffsetDateTime offsetDateTime = OffsetDateTime.of(localDateTime, ZoneOffset.ofHours(-5)); OffsetDateTime offsetDateTime = OffsetDateTime.of(localDateTime, ZoneOffset.ofHours(-5));
@ -598,7 +598,7 @@ public class TestIcebergRecordConverter {
results = readFrom(format, PRIMITIVES_SCHEMA, tempFile.toInputFile()); results = readFrom(format, PRIMITIVES_SCHEMA, tempFile.toInputFile());
assertEquals(results.size(), 1); assertEquals(results.size(), 1);
resultRecord = results.get(0); resultRecord = results.getFirst();
assertNull(resultRecord.get(0, String.class)); assertNull(resultRecord.get(0, String.class));
assertNull(resultRecord.get(1, Integer.class)); assertNull(resultRecord.get(1, Integer.class));
assertNull(resultRecord.get(2, Float.class)); assertNull(resultRecord.get(2, Float.class));
@ -641,7 +641,7 @@ public class TestIcebergRecordConverter {
List<GenericRecord> results = readFrom(format, PRIMITIVES_SCHEMA, tempFile.toInputFile()); List<GenericRecord> results = readFrom(format, PRIMITIVES_SCHEMA, tempFile.toInputFile());
assertEquals(results.size(), 1); assertEquals(results.size(), 1);
GenericRecord resultRecord = results.get(0); GenericRecord resultRecord = results.getFirst();
LocalDateTime localDateTime = LocalDateTime.of(2017, 4, 4, 14, 20, 33, 789000000); LocalDateTime localDateTime = LocalDateTime.of(2017, 4, 4, 14, 20, 33, 789000000);
OffsetDateTime offsetDateTime = OffsetDateTime.of(localDateTime, ZoneOffset.ofHours(-5)); OffsetDateTime offsetDateTime = OffsetDateTime.of(localDateTime, ZoneOffset.ofHours(-5));
@ -699,7 +699,7 @@ public class TestIcebergRecordConverter {
List<GenericRecord> results = readFrom(format, COMPATIBLE_PRIMITIVES_SCHEMA, tempFile.toInputFile()); List<GenericRecord> results = readFrom(format, COMPATIBLE_PRIMITIVES_SCHEMA, tempFile.toInputFile());
assertEquals(results.size(), 1); assertEquals(results.size(), 1);
GenericRecord resultRecord = results.get(0); GenericRecord resultRecord = results.getFirst();
LocalDateTime localDateTime = LocalDateTime.of(2017, 4, 4, 14, 20, 33, 789000000); LocalDateTime localDateTime = LocalDateTime.of(2017, 4, 4, 14, 20, 33, 789000000);
OffsetDateTime offsetDateTime = OffsetDateTime.of(localDateTime, ZoneOffset.ofHours(-5)); OffsetDateTime offsetDateTime = OffsetDateTime.of(localDateTime, ZoneOffset.ofHours(-5));
@ -737,8 +737,7 @@ public class TestIcebergRecordConverter {
List<GenericRecord> results = readFrom(format, STRUCT_SCHEMA, tempFile.toInputFile()); List<GenericRecord> results = readFrom(format, STRUCT_SCHEMA, tempFile.toInputFile());
assertEquals(1, results.size()); assertEquals(1, results.size());
assertInstanceOf(GenericRecord.class, results.get(0)); GenericRecord resultRecord = results.getFirst();
GenericRecord resultRecord = results.get(0);
assertEquals(1, resultRecord.size()); assertEquals(1, resultRecord.size());
assertInstanceOf(GenericRecord.class, resultRecord.get(0)); assertInstanceOf(GenericRecord.class, resultRecord.get(0));
@ -767,8 +766,7 @@ public class TestIcebergRecordConverter {
List<GenericRecord> results = readFrom(format, LIST_SCHEMA, tempFile.toInputFile()); List<GenericRecord> results = readFrom(format, LIST_SCHEMA, tempFile.toInputFile());
assertEquals(1, results.size()); assertEquals(1, results.size());
assertInstanceOf(GenericRecord.class, results.get(0)); GenericRecord resultRecord = results.getFirst();
GenericRecord resultRecord = results.get(0);
assertEquals(1, resultRecord.size()); assertEquals(1, resultRecord.size());
assertInstanceOf(List.class, resultRecord.get(0)); assertInstanceOf(List.class, resultRecord.get(0));
@ -797,8 +795,7 @@ public class TestIcebergRecordConverter {
List<GenericRecord> results = readFrom(format, MAP_SCHEMA, tempFile.toInputFile()); List<GenericRecord> results = readFrom(format, MAP_SCHEMA, tempFile.toInputFile());
assertEquals(1, results.size()); assertEquals(1, results.size());
assertInstanceOf(GenericRecord.class, results.get(0)); GenericRecord resultRecord = results.getFirst();
GenericRecord resultRecord = results.get(0);
assertEquals(1, resultRecord.size()); assertEquals(1, resultRecord.size());
assertInstanceOf(Map.class, resultRecord.get(0)); assertInstanceOf(Map.class, resultRecord.get(0));
@ -837,8 +834,7 @@ public class TestIcebergRecordConverter {
List<GenericRecord> results = readFrom(format, CASE_INSENSITIVE_SCHEMA, tempFile.toInputFile()); List<GenericRecord> results = readFrom(format, CASE_INSENSITIVE_SCHEMA, tempFile.toInputFile());
assertEquals(1, results.size()); assertEquals(1, results.size());
assertInstanceOf(GenericRecord.class, results.get(0)); GenericRecord resultRecord = results.getFirst();
GenericRecord resultRecord = results.get(0);
assertEquals("Text1", resultRecord.get(0, String.class)); assertEquals("Text1", resultRecord.get(0, String.class));
assertEquals("Text2", resultRecord.get(1, String.class)); assertEquals("Text2", resultRecord.get(1, String.class));
@ -861,8 +857,7 @@ public class TestIcebergRecordConverter {
List<GenericRecord> results = readFrom(format, UNORDERED_SCHEMA, tempFile.toInputFile()); List<GenericRecord> results = readFrom(format, UNORDERED_SCHEMA, tempFile.toInputFile());
assertEquals(1, results.size()); assertEquals(1, results.size());
assertInstanceOf(GenericRecord.class, results.get(0)); GenericRecord resultRecord = results.getFirst();
GenericRecord resultRecord = results.get(0);
assertEquals("value1", resultRecord.get(0, String.class)); assertEquals("value1", resultRecord.get(0, String.class));
@ -926,15 +921,12 @@ public class TestIcebergRecordConverter {
} }
private ArrayList<GenericRecord> readFrom(FileFormat format, Schema schema, InputFile inputFile) throws IOException { private ArrayList<GenericRecord> readFrom(FileFormat format, Schema schema, InputFile inputFile) throws IOException {
switch (format) { return switch (format) {
case AVRO: case AVRO -> readFromAvro(schema, inputFile);
return readFromAvro(schema, inputFile); case ORC -> readFromOrc(schema, inputFile);
case ORC: case PARQUET -> readFromParquet(schema, inputFile);
return readFromOrc(schema, inputFile); default -> throw new IOException("Unknown file format: " + format);
case PARQUET: };
return readFromParquet(schema, inputFile);
}
throw new IOException("Unknown file format: " + format);
} }
private void writeToAvro(Schema schema, GenericRecord record, OutputFile outputFile) throws IOException { private void writeToAvro(Schema schema, GenericRecord record, OutputFile outputFile) throws IOException {

View File

@ -138,7 +138,7 @@ public class TestPutIcebergWithHadoopCatalog {
Table table = catalog.loadTable(TABLE_IDENTIFIER); Table table = catalog.loadTable(TABLE_IDENTIFIER);
runner.assertTransferCount(PutIceberg.REL_SUCCESS, 1); runner.assertTransferCount(PutIceberg.REL_SUCCESS, 1);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutIceberg.REL_SUCCESS).get(0); MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutIceberg.REL_SUCCESS).getFirst();
Assertions.assertTrue(table.spec().isPartitioned()); Assertions.assertTrue(table.spec().isPartitioned());
Assertions.assertEquals("4", flowFile.getAttribute(ICEBERG_RECORD_COUNT)); Assertions.assertEquals("4", flowFile.getAttribute(ICEBERG_RECORD_COUNT));
@ -165,7 +165,7 @@ public class TestPutIcebergWithHadoopCatalog {
Table table = catalog.loadTable(TABLE_IDENTIFIER); Table table = catalog.loadTable(TABLE_IDENTIFIER);
runner.assertTransferCount(PutIceberg.REL_SUCCESS, 1); runner.assertTransferCount(PutIceberg.REL_SUCCESS, 1);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutIceberg.REL_SUCCESS).get(0); MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutIceberg.REL_SUCCESS).getFirst();
Assertions.assertTrue(table.spec().isPartitioned()); Assertions.assertTrue(table.spec().isPartitioned());
Assertions.assertEquals("4", flowFile.getAttribute(ICEBERG_RECORD_COUNT)); Assertions.assertEquals("4", flowFile.getAttribute(ICEBERG_RECORD_COUNT));
@ -193,7 +193,7 @@ public class TestPutIcebergWithHadoopCatalog {
Table table = catalog.loadTable(TABLE_IDENTIFIER); Table table = catalog.loadTable(TABLE_IDENTIFIER);
runner.assertTransferCount(PutIceberg.REL_SUCCESS, 1); runner.assertTransferCount(PutIceberg.REL_SUCCESS, 1);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutIceberg.REL_SUCCESS).get(0); MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutIceberg.REL_SUCCESS).getFirst();
Assertions.assertTrue(table.spec().isPartitioned()); Assertions.assertTrue(table.spec().isPartitioned());
Assertions.assertEquals("4", flowFile.getAttribute(ICEBERG_RECORD_COUNT)); Assertions.assertEquals("4", flowFile.getAttribute(ICEBERG_RECORD_COUNT));

View File

@ -169,7 +169,7 @@ public class TestPutIcebergWithHiveCatalog {
.build(); .build();
runner.assertTransferCount(PutIceberg.REL_SUCCESS, 1); runner.assertTransferCount(PutIceberg.REL_SUCCESS, 1);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutIceberg.REL_SUCCESS).get(0); MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutIceberg.REL_SUCCESS).getFirst();
String tableLocation = new URI(table.location()).getPath(); String tableLocation = new URI(table.location()).getPath();
assertTrue(table.spec().isPartitioned()); assertTrue(table.spec().isPartitioned());
@ -206,7 +206,7 @@ public class TestPutIcebergWithHiveCatalog {
.build(); .build();
runner.assertTransferCount(PutIceberg.REL_SUCCESS, 1); runner.assertTransferCount(PutIceberg.REL_SUCCESS, 1);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutIceberg.REL_SUCCESS).get(0); MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutIceberg.REL_SUCCESS).getFirst();
String tableLocation = new URI(table.location()).getPath(); String tableLocation = new URI(table.location()).getPath();
assertTrue(table.spec().isPartitioned()); assertTrue(table.spec().isPartitioned());
@ -244,7 +244,7 @@ public class TestPutIcebergWithHiveCatalog {
.build(); .build();
runner.assertTransferCount(PutIceberg.REL_SUCCESS, 1); runner.assertTransferCount(PutIceberg.REL_SUCCESS, 1);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutIceberg.REL_SUCCESS).get(0); MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutIceberg.REL_SUCCESS).getFirst();
String tableLocation = new URI(table.location()).getPath(); String tableLocation = new URI(table.location()).getPath();
assertTrue(table.spec().isPartitioned()); assertTrue(table.spec().isPartitioned());
@ -286,7 +286,7 @@ public class TestPutIcebergWithHiveCatalog {
.build(); .build();
runner.assertTransferCount(PutIceberg.REL_SUCCESS, 1); runner.assertTransferCount(PutIceberg.REL_SUCCESS, 1);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutIceberg.REL_SUCCESS).get(0); MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutIceberg.REL_SUCCESS).getFirst();
assertTrue(table.spec().isUnpartitioned()); assertTrue(table.spec().isUnpartitioned());
assertEquals("4", flowFile.getAttribute(ICEBERG_RECORD_COUNT)); assertEquals("4", flowFile.getAttribute(ICEBERG_RECORD_COUNT));
@ -299,7 +299,7 @@ public class TestPutIcebergWithHiveCatalog {
private void assertProvenanceEvents() { private void assertProvenanceEvents() {
final List<ProvenanceEventRecord> provenanceEvents = runner.getProvenanceEvents(); final List<ProvenanceEventRecord> provenanceEvents = runner.getProvenanceEvents();
assertEquals(1, provenanceEvents.size()); assertEquals(1, provenanceEvents.size());
final ProvenanceEventRecord sendEvent = provenanceEvents.get(0); final ProvenanceEventRecord sendEvent = provenanceEvents.getFirst();
assertEquals(ProvenanceEventType.SEND, sendEvent.getEventType()); assertEquals(ProvenanceEventType.SEND, sendEvent.getEventType());
assertTrue(sendEvent.getTransitUri().endsWith(CATALOG_NAME + ".db/" + TABLE_NAME)); assertTrue(sendEvent.getTransitUri().endsWith(CATALOG_NAME + ".db/" + TABLE_NAME));
} }

View File

@ -34,7 +34,6 @@ import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.Comparator; import java.util.Comparator;
import java.util.List; import java.util.List;
import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertTrue;
@ -88,7 +87,7 @@ public class IcebergTestUtils {
List<Path> dataFiles = Files.walk(Paths.get(tableLocation + "/data")) List<Path> dataFiles = Files.walk(Paths.get(tableLocation + "/data"))
.filter(Files::isRegularFile) .filter(Files::isRegularFile)
.filter(path -> !path.getFileName().toString().startsWith(".")) .filter(path -> !path.getFileName().toString().startsWith("."))
.collect(Collectors.toList()); .toList();
assertEquals(numberOfDataFiles, dataFiles.size()); assertEquals(numberOfDataFiles, dataFiles.size());
} }