NIFI-11204: Add configurable retry logic for table commits in PutIceberg processor

This closes #6976.

Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>
This commit is contained in:
Mark Bathori 2023-02-21 15:11:19 +01:00 committed by Peter Turcsanyi
parent 7954ff355c
commit e370292d7f
No known key found for this signature in database
GPG Key ID: 55A813F1C3E553DC
5 changed files with 314 additions and 117 deletions

View File

@ -107,7 +107,7 @@ public abstract class AbstractIcebergProcessor extends AbstractProcessor {
} catch (Exception e) {
getLogger().error("Privileged action failed with kerberos user " + kerberosUser, e);
session.transfer(flowFile, REL_FAILURE);
session.transfer(session.penalize(flowFile), REL_FAILURE);
}
}
}

View File

@ -19,10 +19,12 @@ package org.apache.nifi.processors.iceberg;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PendingUpdate;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.io.TaskWriter;
import org.apache.iceberg.io.WriteResult;
import org.apache.iceberg.util.Tasks;
@ -54,6 +56,7 @@ import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
@ -114,6 +117,42 @@ public class PutIceberg extends AbstractIcebergProcessor {
.addValidator(StandardValidators.LONG_VALIDATOR)
.build();
static final PropertyDescriptor NUMBER_OF_COMMIT_RETRIES = new PropertyDescriptor.Builder()
.name("number-of-commit-retries")
.displayName("Number of Commit Retries")
.description("Number of times to retry a commit before failing.")
.required(true)
.defaultValue("10")
.addValidator(StandardValidators.INTEGER_VALIDATOR)
.build();
static final PropertyDescriptor MINIMUM_COMMIT_WAIT_TIME = new PropertyDescriptor.Builder()
.name("minimum-commit-wait-time")
.displayName("Minimum Commit Wait Time")
.description("Minimum time to wait before retrying a commit.")
.required(true)
.defaultValue("100 ms")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.build();
static final PropertyDescriptor MAXIMUM_COMMIT_WAIT_TIME = new PropertyDescriptor.Builder()
.name("maximum-commit-wait-time")
.displayName("Maximum Commit Wait Time")
.description("Maximum time to wait before retrying a commit.")
.required(true)
.defaultValue("2 sec")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.build();
static final PropertyDescriptor MAXIMUM_COMMIT_DURATION = new PropertyDescriptor.Builder()
.name("maximum-commit-duration")
.displayName("Maximum Commit Duration")
.description("Total retry timeout period for a commit.")
.required(true)
.defaultValue("30 sec")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.build();
static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("A FlowFile is routed to this relationship after the data ingestion was successful.")
@ -131,7 +170,11 @@ public class PutIceberg extends AbstractIcebergProcessor {
TABLE_NAME,
FILE_FORMAT,
MAXIMUM_FILE_SIZE,
KERBEROS_USER_SERVICE
KERBEROS_USER_SERVICE,
NUMBER_OF_COMMIT_RETRIES,
MINIMUM_COMMIT_WAIT_TIME,
MAXIMUM_COMMIT_WAIT_TIME,
MAXIMUM_COMMIT_DURATION
));
public static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
@ -161,7 +204,7 @@ public class PutIceberg extends AbstractIcebergProcessor {
table = loadTable(context);
} catch (Exception e) {
getLogger().error("Failed to load table from catalog", e);
session.transfer(flowFile, REL_FAILURE);
session.transfer(session.penalize(flowFile), REL_FAILURE);
return;
}
@ -182,7 +225,7 @@ public class PutIceberg extends AbstractIcebergProcessor {
}
final WriteResult result = taskWriter.complete();
appendDataFiles(table, result);
appendDataFiles(context, table, result);
} catch (Exception e) {
getLogger().error("Exception occurred while writing iceberg records. Removing uncommitted data files", e);
try {
@ -193,7 +236,7 @@ public class PutIceberg extends AbstractIcebergProcessor {
getLogger().error("Failed to abort uncommitted data files", ex);
}
session.transfer(flowFile, REL_FAILURE);
session.transfer(session.penalize(flowFile), REL_FAILURE);
return;
}
@ -223,14 +266,24 @@ public class PutIceberg extends AbstractIcebergProcessor {
/**
* Appends the pending data files to the given {@link Table}.
*
* @param context processor context
* @param table table to append
* @param result datafiles created by the {@link TaskWriter}
*/
private void appendDataFiles(Table table, WriteResult result) {
void appendDataFiles(ProcessContext context, Table table, WriteResult result) {
final int numberOfCommitRetries = context.getProperty(NUMBER_OF_COMMIT_RETRIES).evaluateAttributeExpressions().asInteger();
final long minimumCommitWaitTime = context.getProperty(MINIMUM_COMMIT_WAIT_TIME).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS);
final long maximumCommitWaitTime = context.getProperty(MAXIMUM_COMMIT_WAIT_TIME).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS);
final long maximumCommitDuration = context.getProperty(MAXIMUM_COMMIT_DURATION).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS);
final AppendFiles appender = table.newAppend();
Arrays.stream(result.dataFiles()).forEach(appender::appendFile);
appender.commit();
Tasks.foreach(appender)
.exponentialBackoff(minimumCommitWaitTime, maximumCommitWaitTime, maximumCommitDuration, 2.0)
.retry(numberOfCommitRetries)
.onlyRetryOn(CommitFailedException.class)
.run(PendingUpdate::commit);
}
/**
@ -253,8 +306,7 @@ public class PutIceberg extends AbstractIcebergProcessor {
*/
void abort(DataFile[] dataFiles, Table table) {
Tasks.foreach(dataFiles)
.throwFailureWhenFinished()
.noRetry()
.retry(3)
.run(file -> table.io().deleteFile(file.path().toString()));
}

View File

@ -0,0 +1,58 @@
<!DOCTYPE html>
<html lang="en" xmlns="http://www.w3.org/1999/html">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<head>
<meta charset="utf-8"/>
<title>PutIceberg</title>
<link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css"/>
</head>
<body>
<h1>PutIceberg</h1>
<h3>Description</h3>
<p>
Iceberg is a high-performance format for huge analytic tables.
The PutIceberg processor is capable of pushing data into Iceberg tables using different types of Iceberg catalog implementations.
</p>
<h3>Commit retry properties</h3>
<p>
Iceberg supports multiple concurrent writes using optimistic concurrency.
The processor's commit retry implementation is using <b>exponential backoff</b> with <b>jitter</b> and <b>scale factor 2</b>, and provides the following properties to configure the behaviour according to its usage.
<ul>
<li>
Number Of Commit Retries (default: 10) - Number of retries that the processor is going to try to commit the new data files.
</li>
<li>
Minimum Commit Wait Time (default: 100 ms) - Minimum time that the processor is going to wait before each commit attempt.
</li>
<li>
Maximum Commit Wait Time (default: 2 sec) - Maximum time that the processor is going to wait before each commit attempt.
</li>
<li>
Maximum Commit Duration (default: 30 sec) - Maximum duration that the processor is going to wait before failing the current processor event's commit.
</li>
</ul>
The NiFi side retry logic is built on top of the Iceberg commit retry logic which can be configured through table properties. See more: <a href="https://iceberg.apache.org/docs/latest/configuration/#table-behavior-properties">Table behavior properties</a>
</p>
</body>
</html>

View File

@ -0,0 +1,195 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.iceberg;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.io.TaskWriter;
import org.apache.iceberg.io.WriteResult;
import org.apache.iceberg.types.Types;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processors.iceberg.catalog.TestHadoopCatalogService;
import org.apache.nifi.processors.iceberg.converter.IcebergRecordConverter;
import org.apache.nifi.processors.iceberg.writer.IcebergTaskWriterFactory;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.util.MockPropertyValue;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledOnOs;
import org.mockito.Mockito;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import static org.apache.nifi.processors.iceberg.PutIceberg.MAXIMUM_COMMIT_DURATION;
import static org.apache.nifi.processors.iceberg.PutIceberg.MAXIMUM_COMMIT_WAIT_TIME;
import static org.apache.nifi.processors.iceberg.PutIceberg.MINIMUM_COMMIT_WAIT_TIME;
import static org.apache.nifi.processors.iceberg.PutIceberg.NUMBER_OF_COMMIT_RETRIES;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.condition.OS.WINDOWS;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class TestDataFileActions {
private static final Namespace NAMESPACE = Namespace.of("default");
private static final TableIdentifier TABLE_IDENTIFIER = TableIdentifier.of(NAMESPACE, "abort");
private static final Schema ABORT_SCHEMA = new Schema(
Types.NestedField.required(0, "id", Types.IntegerType.get())
);
private PutIceberg icebergProcessor;
@BeforeEach
public void setUp() {
icebergProcessor = new PutIceberg();
}
@DisabledOnOs(WINDOWS)
@Test
public void testAbortUncommittedFiles() throws IOException {
Table table = initCatalog();
List<RecordField> recordFields = Collections.singletonList(new RecordField("id", RecordFieldType.INT.getDataType()));
RecordSchema abortSchema = new SimpleRecordSchema(recordFields);
List<MapRecord> recordList = new ArrayList<>();
recordList.add(new MapRecord(abortSchema, Collections.singletonMap("id", 1)));
recordList.add(new MapRecord(abortSchema, Collections.singletonMap("id", 2)));
recordList.add(new MapRecord(abortSchema, Collections.singletonMap("id", 3)));
recordList.add(new MapRecord(abortSchema, Collections.singletonMap("id", 4)));
recordList.add(new MapRecord(abortSchema, Collections.singletonMap("id", 5)));
IcebergTaskWriterFactory taskWriterFactory = new IcebergTaskWriterFactory(table, new Random().nextLong(), FileFormat.PARQUET, null);
TaskWriter<Record> taskWriter = taskWriterFactory.create();
IcebergRecordConverter recordConverter = new IcebergRecordConverter(table.schema(), abortSchema, FileFormat.PARQUET);
for (MapRecord record : recordList) {
taskWriter.write(recordConverter.convert(record));
}
DataFile[] dataFiles = taskWriter.dataFiles();
// DataFiles written by the taskWriter should exist
for (DataFile dataFile : dataFiles) {
Assertions.assertTrue(Files.exists(Paths.get(dataFile.path().toString())));
}
icebergProcessor.abort(taskWriter.dataFiles(), table);
// DataFiles shouldn't exist after aborting them
for (DataFile dataFile : dataFiles) {
Assertions.assertFalse(Files.exists(Paths.get(dataFile.path().toString())));
}
}
@Test
public void testAppenderCommitRetryExceeded() {
ProcessContext context = Mockito.mock(ProcessContext.class);
when(context.getProperty(NUMBER_OF_COMMIT_RETRIES)).thenReturn(new MockPropertyValue("3", null));
when(context.getProperty(MINIMUM_COMMIT_WAIT_TIME)).thenReturn(new MockPropertyValue("1 ms", null));
when(context.getProperty(MAXIMUM_COMMIT_WAIT_TIME)).thenReturn(new MockPropertyValue("1 ms", null));
when(context.getProperty(MAXIMUM_COMMIT_DURATION)).thenReturn(new MockPropertyValue("1 min", null));
AppendFiles appender = Mockito.mock(AppendFiles.class);
doThrow(CommitFailedException.class).when(appender).commit();
Table table = Mockito.mock(Table.class);
when(table.newAppend()).thenReturn(appender);
// assert the commit action eventually fails after exceeding the number of retries
assertThrows(CommitFailedException.class, () -> icebergProcessor.appendDataFiles(context, table, WriteResult.builder().build()));
// verify the commit action was called the configured number of times
verify(appender, times(4)).commit();
}
@SuppressWarnings("unchecked")
@Test
public void testAppenderCommitSucceeded() {
ProcessContext context = Mockito.mock(ProcessContext.class);
when(context.getProperty(NUMBER_OF_COMMIT_RETRIES)).thenReturn(new MockPropertyValue("3", null));
when(context.getProperty(MINIMUM_COMMIT_WAIT_TIME)).thenReturn(new MockPropertyValue("1 ms", null));
when(context.getProperty(MAXIMUM_COMMIT_WAIT_TIME)).thenReturn(new MockPropertyValue("1 ms", null));
when(context.getProperty(MAXIMUM_COMMIT_DURATION)).thenReturn(new MockPropertyValue("1 min", null));
AppendFiles appender = Mockito.mock(AppendFiles.class);
// the commit action should throw exception 2 times before succeeding
doThrow(CommitFailedException.class, CommitFailedException.class).doNothing().when(appender).commit();
Table table = Mockito.mock(Table.class);
when(table.newAppend()).thenReturn(appender);
// the method call shouldn't throw exception since the configured number of retries is higher than the number of failed commit actions
icebergProcessor.appendDataFiles(context, table, WriteResult.builder().build());
// verify the proper number of commit action was called
verify(appender, times(3)).commit();
}
@Test
public void testMaxCommitDurationExceeded() {
ProcessContext context = Mockito.mock(ProcessContext.class);
when(context.getProperty(NUMBER_OF_COMMIT_RETRIES)).thenReturn(new MockPropertyValue("5", null));
when(context.getProperty(MINIMUM_COMMIT_WAIT_TIME)).thenReturn(new MockPropertyValue("2 ms", null));
when(context.getProperty(MAXIMUM_COMMIT_WAIT_TIME)).thenReturn(new MockPropertyValue("2 ms", null));
when(context.getProperty(MAXIMUM_COMMIT_DURATION)).thenReturn(new MockPropertyValue("1 ms", null));
AppendFiles appender = Mockito.mock(AppendFiles.class);
doThrow(CommitFailedException.class).when(appender).commit();
Table table = Mockito.mock(Table.class);
when(table.newAppend()).thenReturn(appender);
// assert the commit action eventually fails after exceeding duration of maximum retries
assertThrows(CommitFailedException.class, () -> icebergProcessor.appendDataFiles(context, table, WriteResult.builder().build()));
// verify the commit action was called only 2 times instead of the configured 5
verify(appender, times(2)).commit();
}
private Table initCatalog() throws IOException {
TestHadoopCatalogService catalogService = new TestHadoopCatalogService();
Catalog catalog = catalogService.getCatalog();
return catalog.createTable(TABLE_IDENTIFIER, ABORT_SCHEMA, PartitionSpec.unpartitioned());
}
}

View File

@ -1,108 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.iceberg;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.io.TaskWriter;
import org.apache.iceberg.types.Types;
import org.apache.nifi.processors.iceberg.catalog.TestHadoopCatalogService;
import org.apache.nifi.processors.iceberg.converter.IcebergRecordConverter;
import org.apache.nifi.processors.iceberg.writer.IcebergTaskWriterFactory;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledOnOs;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import static org.junit.jupiter.api.condition.OS.WINDOWS;
public class TestFileAbort {
private static final Namespace NAMESPACE = Namespace.of("default");
private static final TableIdentifier TABLE_IDENTIFIER = TableIdentifier.of(NAMESPACE, "abort");
private static final Schema ABORT_SCHEMA = new Schema(
Types.NestedField.required(0, "id", Types.IntegerType.get())
);
@DisabledOnOs(WINDOWS)
@Test
public void abortUncommittedFiles() throws IOException {
Table table = initCatalog();
List<RecordField> recordFields = Collections.singletonList(new RecordField("id", RecordFieldType.INT.getDataType()));
RecordSchema abortSchema = new SimpleRecordSchema(recordFields);
List<MapRecord> recordList = new ArrayList<>();
recordList.add(new MapRecord(abortSchema, Collections.singletonMap("id", 1)));
recordList.add(new MapRecord(abortSchema, Collections.singletonMap("id", 2)));
recordList.add(new MapRecord(abortSchema, Collections.singletonMap("id", 3)));
recordList.add(new MapRecord(abortSchema, Collections.singletonMap("id", 4)));
recordList.add(new MapRecord(abortSchema, Collections.singletonMap("id", 5)));
IcebergTaskWriterFactory taskWriterFactory = new IcebergTaskWriterFactory(table, new Random().nextLong(), FileFormat.PARQUET, null);
TaskWriter<Record> taskWriter = taskWriterFactory.create();
IcebergRecordConverter recordConverter = new IcebergRecordConverter(table.schema(), abortSchema, FileFormat.PARQUET);
for (MapRecord record : recordList) {
taskWriter.write(recordConverter.convert(record));
}
DataFile[] dataFiles = taskWriter.dataFiles();
// DataFiles written by the taskWriter should exist
for (DataFile dataFile : dataFiles) {
Assertions.assertTrue(Files.exists(Paths.get(dataFile.path().toString())));
}
PutIceberg icebergProcessor = new PutIceberg();
icebergProcessor.abort(taskWriter.dataFiles(), table);
// DataFiles shouldn't exist after aborting them
for (DataFile dataFile : dataFiles) {
Assertions.assertFalse(Files.exists(Paths.get(dataFile.path().toString())));
}
}
private Table initCatalog() throws IOException {
TestHadoopCatalogService catalogService = new TestHadoopCatalogService();
Catalog catalog = catalogService.getCatalog();
return catalog.createTable(TABLE_IDENTIFIER, ABORT_SCHEMA, PartitionSpec.unpartitioned());
}
}