diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/AbstractIcebergProcessor.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/AbstractIcebergProcessor.java index 7dc53eefd7..a4ec2ccf07 100644 --- a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/AbstractIcebergProcessor.java +++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/AbstractIcebergProcessor.java @@ -107,7 +107,7 @@ public abstract class AbstractIcebergProcessor extends AbstractProcessor { } catch (Exception e) { getLogger().error("Privileged action failed with kerberos user " + kerberosUser, e); - session.transfer(flowFile, REL_FAILURE); + session.transfer(session.penalize(flowFile), REL_FAILURE); } } } diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java index cdd2997a58..1af97768f3 100644 --- a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java +++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java @@ -19,10 +19,12 @@ package org.apache.nifi.processors.iceberg; import org.apache.iceberg.AppendFiles; import org.apache.iceberg.DataFile; import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PendingUpdate; import org.apache.iceberg.Table; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.io.TaskWriter; import org.apache.iceberg.io.WriteResult; import org.apache.iceberg.util.Tasks; @@ -54,6 +56,7 @@ import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Set; +import java.util.concurrent.TimeUnit; import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; @@ -114,6 +117,42 @@ public class PutIceberg extends AbstractIcebergProcessor { .addValidator(StandardValidators.LONG_VALIDATOR) .build(); + static final PropertyDescriptor NUMBER_OF_COMMIT_RETRIES = new PropertyDescriptor.Builder() + .name("number-of-commit-retries") + .displayName("Number of Commit Retries") + .description("Number of times to retry a commit before failing.") + .required(true) + .defaultValue("10") + .addValidator(StandardValidators.INTEGER_VALIDATOR) + .build(); + + static final PropertyDescriptor MINIMUM_COMMIT_WAIT_TIME = new PropertyDescriptor.Builder() + .name("minimum-commit-wait-time") + .displayName("Minimum Commit Wait Time") + .description("Minimum time to wait before retrying a commit.") + .required(true) + .defaultValue("100 ms") + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .build(); + + static final PropertyDescriptor MAXIMUM_COMMIT_WAIT_TIME = new PropertyDescriptor.Builder() + .name("maximum-commit-wait-time") + .displayName("Maximum Commit Wait Time") + .description("Maximum time to wait before retrying a commit.") + .required(true) + .defaultValue("2 sec") + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .build(); + + static final PropertyDescriptor MAXIMUM_COMMIT_DURATION = new PropertyDescriptor.Builder() + .name("maximum-commit-duration") + .displayName("Maximum Commit Duration") + .description("Total retry timeout period for a commit.") + .required(true) + .defaultValue("30 sec") + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .build(); + static final Relationship REL_SUCCESS = new Relationship.Builder() .name("success") .description("A FlowFile is routed to this relationship after the data ingestion was successful.") @@ -131,7 +170,11 @@ public class PutIceberg extends AbstractIcebergProcessor { TABLE_NAME, FILE_FORMAT, MAXIMUM_FILE_SIZE, - KERBEROS_USER_SERVICE + KERBEROS_USER_SERVICE, + NUMBER_OF_COMMIT_RETRIES, + MINIMUM_COMMIT_WAIT_TIME, + MAXIMUM_COMMIT_WAIT_TIME, + MAXIMUM_COMMIT_DURATION )); public static final Set RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList( @@ -161,7 +204,7 @@ public class PutIceberg extends AbstractIcebergProcessor { table = loadTable(context); } catch (Exception e) { getLogger().error("Failed to load table from catalog", e); - session.transfer(flowFile, REL_FAILURE); + session.transfer(session.penalize(flowFile), REL_FAILURE); return; } @@ -182,7 +225,7 @@ public class PutIceberg extends AbstractIcebergProcessor { } final WriteResult result = taskWriter.complete(); - appendDataFiles(table, result); + appendDataFiles(context, table, result); } catch (Exception e) { getLogger().error("Exception occurred while writing iceberg records. Removing uncommitted data files", e); try { @@ -193,7 +236,7 @@ public class PutIceberg extends AbstractIcebergProcessor { getLogger().error("Failed to abort uncommitted data files", ex); } - session.transfer(flowFile, REL_FAILURE); + session.transfer(session.penalize(flowFile), REL_FAILURE); return; } @@ -223,14 +266,24 @@ public class PutIceberg extends AbstractIcebergProcessor { /** * Appends the pending data files to the given {@link Table}. * + * @param context processor context * @param table table to append * @param result datafiles created by the {@link TaskWriter} */ - private void appendDataFiles(Table table, WriteResult result) { + void appendDataFiles(ProcessContext context, Table table, WriteResult result) { + final int numberOfCommitRetries = context.getProperty(NUMBER_OF_COMMIT_RETRIES).evaluateAttributeExpressions().asInteger(); + final long minimumCommitWaitTime = context.getProperty(MINIMUM_COMMIT_WAIT_TIME).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS); + final long maximumCommitWaitTime = context.getProperty(MAXIMUM_COMMIT_WAIT_TIME).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS); + final long maximumCommitDuration = context.getProperty(MAXIMUM_COMMIT_DURATION).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS); + final AppendFiles appender = table.newAppend(); Arrays.stream(result.dataFiles()).forEach(appender::appendFile); - appender.commit(); + Tasks.foreach(appender) + .exponentialBackoff(minimumCommitWaitTime, maximumCommitWaitTime, maximumCommitDuration, 2.0) + .retry(numberOfCommitRetries) + .onlyRetryOn(CommitFailedException.class) + .run(PendingUpdate::commit); } /** @@ -253,8 +306,7 @@ public class PutIceberg extends AbstractIcebergProcessor { */ void abort(DataFile[] dataFiles, Table table) { Tasks.foreach(dataFiles) - .throwFailureWhenFinished() - .noRetry() + .retry(3) .run(file -> table.io().deleteFile(file.path().toString())); } diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/resources/docs/org.apache.nifi.processors.iceberg.PutIceberg/additionalDetails.html b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/resources/docs/org.apache.nifi.processors.iceberg.PutIceberg/additionalDetails.html new file mode 100644 index 0000000000..06844910f4 --- /dev/null +++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/resources/docs/org.apache.nifi.processors.iceberg.PutIceberg/additionalDetails.html @@ -0,0 +1,58 @@ + + + + + + + PutIceberg + + + + + +

PutIceberg

+ +

Description

+

+ Iceberg is a high-performance format for huge analytic tables. + The PutIceberg processor is capable of pushing data into Iceberg tables using different types of Iceberg catalog implementations. +

+ +

Commit retry properties

+

+ Iceberg supports multiple concurrent writes using optimistic concurrency. + The processor's commit retry implementation is using exponential backoff with jitter and scale factor 2, and provides the following properties to configure the behaviour according to its usage. + +

+ + The NiFi side retry logic is built on top of the Iceberg commit retry logic which can be configured through table properties. See more: Table behavior properties +

+ + + \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestDataFileActions.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestDataFileActions.java new file mode 100644 index 0000000000..dc001799f9 --- /dev/null +++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestDataFileActions.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.iceberg; + +import org.apache.iceberg.AppendFiles; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.exceptions.CommitFailedException; +import org.apache.iceberg.io.TaskWriter; +import org.apache.iceberg.io.WriteResult; +import org.apache.iceberg.types.Types; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processors.iceberg.catalog.TestHadoopCatalogService; +import org.apache.nifi.processors.iceberg.converter.IcebergRecordConverter; +import org.apache.nifi.processors.iceberg.writer.IcebergTaskWriterFactory; +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.record.MapRecord; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.util.MockPropertyValue; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.DisabledOnOs; +import org.mockito.Mockito; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Random; + +import static org.apache.nifi.processors.iceberg.PutIceberg.MAXIMUM_COMMIT_DURATION; +import static org.apache.nifi.processors.iceberg.PutIceberg.MAXIMUM_COMMIT_WAIT_TIME; +import static org.apache.nifi.processors.iceberg.PutIceberg.MINIMUM_COMMIT_WAIT_TIME; +import static org.apache.nifi.processors.iceberg.PutIceberg.NUMBER_OF_COMMIT_RETRIES; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.condition.OS.WINDOWS; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class TestDataFileActions { + + private static final Namespace NAMESPACE = Namespace.of("default"); + private static final TableIdentifier TABLE_IDENTIFIER = TableIdentifier.of(NAMESPACE, "abort"); + + private static final Schema ABORT_SCHEMA = new Schema( + Types.NestedField.required(0, "id", Types.IntegerType.get()) + ); + + private PutIceberg icebergProcessor; + + @BeforeEach + public void setUp() { + icebergProcessor = new PutIceberg(); + } + + @DisabledOnOs(WINDOWS) + @Test + public void testAbortUncommittedFiles() throws IOException { + Table table = initCatalog(); + + List recordFields = Collections.singletonList(new RecordField("id", RecordFieldType.INT.getDataType())); + RecordSchema abortSchema = new SimpleRecordSchema(recordFields); + + List recordList = new ArrayList<>(); + recordList.add(new MapRecord(abortSchema, Collections.singletonMap("id", 1))); + recordList.add(new MapRecord(abortSchema, Collections.singletonMap("id", 2))); + recordList.add(new MapRecord(abortSchema, Collections.singletonMap("id", 3))); + recordList.add(new MapRecord(abortSchema, Collections.singletonMap("id", 4))); + recordList.add(new MapRecord(abortSchema, Collections.singletonMap("id", 5))); + + IcebergTaskWriterFactory taskWriterFactory = new IcebergTaskWriterFactory(table, new Random().nextLong(), FileFormat.PARQUET, null); + TaskWriter taskWriter = taskWriterFactory.create(); + + IcebergRecordConverter recordConverter = new IcebergRecordConverter(table.schema(), abortSchema, FileFormat.PARQUET); + + for (MapRecord record : recordList) { + taskWriter.write(recordConverter.convert(record)); + } + + DataFile[] dataFiles = taskWriter.dataFiles(); + + // DataFiles written by the taskWriter should exist + for (DataFile dataFile : dataFiles) { + Assertions.assertTrue(Files.exists(Paths.get(dataFile.path().toString()))); + } + + icebergProcessor.abort(taskWriter.dataFiles(), table); + + // DataFiles shouldn't exist after aborting them + for (DataFile dataFile : dataFiles) { + Assertions.assertFalse(Files.exists(Paths.get(dataFile.path().toString()))); + } + } + + @Test + public void testAppenderCommitRetryExceeded() { + ProcessContext context = Mockito.mock(ProcessContext.class); + when(context.getProperty(NUMBER_OF_COMMIT_RETRIES)).thenReturn(new MockPropertyValue("3", null)); + when(context.getProperty(MINIMUM_COMMIT_WAIT_TIME)).thenReturn(new MockPropertyValue("1 ms", null)); + when(context.getProperty(MAXIMUM_COMMIT_WAIT_TIME)).thenReturn(new MockPropertyValue("1 ms", null)); + when(context.getProperty(MAXIMUM_COMMIT_DURATION)).thenReturn(new MockPropertyValue("1 min", null)); + + AppendFiles appender = Mockito.mock(AppendFiles.class); + doThrow(CommitFailedException.class).when(appender).commit(); + + Table table = Mockito.mock(Table.class); + when(table.newAppend()).thenReturn(appender); + + // assert the commit action eventually fails after exceeding the number of retries + assertThrows(CommitFailedException.class, () -> icebergProcessor.appendDataFiles(context, table, WriteResult.builder().build())); + + // verify the commit action was called the configured number of times + verify(appender, times(4)).commit(); + } + + @SuppressWarnings("unchecked") + @Test + public void testAppenderCommitSucceeded() { + ProcessContext context = Mockito.mock(ProcessContext.class); + when(context.getProperty(NUMBER_OF_COMMIT_RETRIES)).thenReturn(new MockPropertyValue("3", null)); + when(context.getProperty(MINIMUM_COMMIT_WAIT_TIME)).thenReturn(new MockPropertyValue("1 ms", null)); + when(context.getProperty(MAXIMUM_COMMIT_WAIT_TIME)).thenReturn(new MockPropertyValue("1 ms", null)); + when(context.getProperty(MAXIMUM_COMMIT_DURATION)).thenReturn(new MockPropertyValue("1 min", null)); + + AppendFiles appender = Mockito.mock(AppendFiles.class); + // the commit action should throw exception 2 times before succeeding + doThrow(CommitFailedException.class, CommitFailedException.class).doNothing().when(appender).commit(); + + Table table = Mockito.mock(Table.class); + when(table.newAppend()).thenReturn(appender); + + // the method call shouldn't throw exception since the configured number of retries is higher than the number of failed commit actions + icebergProcessor.appendDataFiles(context, table, WriteResult.builder().build()); + + // verify the proper number of commit action was called + verify(appender, times(3)).commit(); + } + + @Test + public void testMaxCommitDurationExceeded() { + ProcessContext context = Mockito.mock(ProcessContext.class); + when(context.getProperty(NUMBER_OF_COMMIT_RETRIES)).thenReturn(new MockPropertyValue("5", null)); + when(context.getProperty(MINIMUM_COMMIT_WAIT_TIME)).thenReturn(new MockPropertyValue("2 ms", null)); + when(context.getProperty(MAXIMUM_COMMIT_WAIT_TIME)).thenReturn(new MockPropertyValue("2 ms", null)); + when(context.getProperty(MAXIMUM_COMMIT_DURATION)).thenReturn(new MockPropertyValue("1 ms", null)); + + AppendFiles appender = Mockito.mock(AppendFiles.class); + doThrow(CommitFailedException.class).when(appender).commit(); + + Table table = Mockito.mock(Table.class); + when(table.newAppend()).thenReturn(appender); + + // assert the commit action eventually fails after exceeding duration of maximum retries + assertThrows(CommitFailedException.class, () -> icebergProcessor.appendDataFiles(context, table, WriteResult.builder().build())); + + // verify the commit action was called only 2 times instead of the configured 5 + verify(appender, times(2)).commit(); + } + + private Table initCatalog() throws IOException { + TestHadoopCatalogService catalogService = new TestHadoopCatalogService(); + Catalog catalog = catalogService.getCatalog(); + + return catalog.createTable(TABLE_IDENTIFIER, ABORT_SCHEMA, PartitionSpec.unpartitioned()); + } +} diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestFileAbort.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestFileAbort.java deleted file mode 100644 index 0b5403986b..0000000000 --- a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestFileAbort.java +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.iceberg; - -import org.apache.iceberg.DataFile; -import org.apache.iceberg.FileFormat; -import org.apache.iceberg.PartitionSpec; -import org.apache.iceberg.Schema; -import org.apache.iceberg.Table; -import org.apache.iceberg.catalog.Catalog; -import org.apache.iceberg.catalog.Namespace; -import org.apache.iceberg.catalog.TableIdentifier; -import org.apache.iceberg.data.Record; -import org.apache.iceberg.io.TaskWriter; -import org.apache.iceberg.types.Types; -import org.apache.nifi.processors.iceberg.catalog.TestHadoopCatalogService; -import org.apache.nifi.processors.iceberg.converter.IcebergRecordConverter; -import org.apache.nifi.processors.iceberg.writer.IcebergTaskWriterFactory; -import org.apache.nifi.serialization.SimpleRecordSchema; -import org.apache.nifi.serialization.record.MapRecord; -import org.apache.nifi.serialization.record.RecordField; -import org.apache.nifi.serialization.record.RecordFieldType; -import org.apache.nifi.serialization.record.RecordSchema; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.condition.DisabledOnOs; - -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Paths; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Random; - -import static org.junit.jupiter.api.condition.OS.WINDOWS; - -public class TestFileAbort { - - private static final Namespace NAMESPACE = Namespace.of("default"); - private static final TableIdentifier TABLE_IDENTIFIER = TableIdentifier.of(NAMESPACE, "abort"); - - private static final Schema ABORT_SCHEMA = new Schema( - Types.NestedField.required(0, "id", Types.IntegerType.get()) - ); - - @DisabledOnOs(WINDOWS) - @Test - public void abortUncommittedFiles() throws IOException { - Table table = initCatalog(); - - List recordFields = Collections.singletonList(new RecordField("id", RecordFieldType.INT.getDataType())); - RecordSchema abortSchema = new SimpleRecordSchema(recordFields); - - List recordList = new ArrayList<>(); - recordList.add(new MapRecord(abortSchema, Collections.singletonMap("id", 1))); - recordList.add(new MapRecord(abortSchema, Collections.singletonMap("id", 2))); - recordList.add(new MapRecord(abortSchema, Collections.singletonMap("id", 3))); - recordList.add(new MapRecord(abortSchema, Collections.singletonMap("id", 4))); - recordList.add(new MapRecord(abortSchema, Collections.singletonMap("id", 5))); - - IcebergTaskWriterFactory taskWriterFactory = new IcebergTaskWriterFactory(table, new Random().nextLong(), FileFormat.PARQUET, null); - TaskWriter taskWriter = taskWriterFactory.create(); - - IcebergRecordConverter recordConverter = new IcebergRecordConverter(table.schema(), abortSchema, FileFormat.PARQUET); - - for (MapRecord record : recordList) { - taskWriter.write(recordConverter.convert(record)); - } - - DataFile[] dataFiles = taskWriter.dataFiles(); - - // DataFiles written by the taskWriter should exist - for (DataFile dataFile : dataFiles) { - Assertions.assertTrue(Files.exists(Paths.get(dataFile.path().toString()))); - } - - PutIceberg icebergProcessor = new PutIceberg(); - icebergProcessor.abort(taskWriter.dataFiles(), table); - - // DataFiles shouldn't exist after aborting them - for (DataFile dataFile : dataFiles) { - Assertions.assertFalse(Files.exists(Paths.get(dataFile.path().toString()))); - } - } - - private Table initCatalog() throws IOException { - TestHadoopCatalogService catalogService = new TestHadoopCatalogService(); - Catalog catalog = catalogService.getCatalog(); - - return catalog.createTable(TABLE_IDENTIFIER, ABORT_SCHEMA, PartitionSpec.unpartitioned()); - } -}