NIFI-11552: Support FlowFile Attributes in some PutIceberg proeprties

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #7268.
This commit is contained in:
Matthew Burgess 2023-05-18 18:17:59 -04:00 committed by Pierre Villard
parent df04c60e01
commit adb8420b48
No known key found for this signature in database
GPG Key ID: F92A93B30C07C6D5
3 changed files with 37 additions and 20 deletions

View File

@ -37,6 +37,7 @@ import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.hadoop.SecurityUtil;
import org.apache.nifi.processor.ProcessContext;
@ -91,6 +92,7 @@ public class PutIceberg extends AbstractIcebergProcessor {
.name("catalog-namespace")
.displayName("Catalog Namespace")
.description("The namespace of the catalog.")
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.required(true)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.build();
@ -98,7 +100,8 @@ public class PutIceberg extends AbstractIcebergProcessor {
static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
.name("table-name")
.displayName("Table Name")
.description("The name of the table.")
.description("The name of the Iceberg table to write to.")
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.required(true)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.build();
@ -119,6 +122,7 @@ public class PutIceberg extends AbstractIcebergProcessor {
.displayName("Maximum File Size")
.description("The maximum size that a file can be, if the file size is exceeded a new file will be generated with the remaining data." +
" If not set, then the 'write.target-file-size-bytes' table property will be used, default value is 512 MB.")
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.LONG_VALIDATOR)
.build();
@ -127,6 +131,7 @@ public class PutIceberg extends AbstractIcebergProcessor {
.displayName("Number of Commit Retries")
.description("Number of times to retry a commit before failing.")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.defaultValue("10")
.addValidator(StandardValidators.INTEGER_VALIDATOR)
.build();
@ -136,6 +141,7 @@ public class PutIceberg extends AbstractIcebergProcessor {
.displayName("Minimum Commit Wait Time")
.description("Minimum time to wait before retrying a commit.")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.defaultValue("100 ms")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.build();
@ -145,6 +151,7 @@ public class PutIceberg extends AbstractIcebergProcessor {
.displayName("Maximum Commit Wait Time")
.description("Maximum time to wait before retrying a commit.")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.defaultValue("2 sec")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.build();
@ -154,6 +161,7 @@ public class PutIceberg extends AbstractIcebergProcessor {
.displayName("Maximum Commit Duration")
.description("Total retry timeout period for a commit.")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.defaultValue("30 sec")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.build();
@ -225,13 +233,13 @@ public class PutIceberg extends AbstractIcebergProcessor {
@Override
public void doOnTrigger(ProcessContext context, ProcessSession session, FlowFile flowFile) throws ProcessException {
final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
final String fileFormat = context.getProperty(FILE_FORMAT).evaluateAttributeExpressions().getValue();
final String maximumFileSize = context.getProperty(MAXIMUM_FILE_SIZE).evaluateAttributeExpressions().getValue();
final String fileFormat = context.getProperty(FILE_FORMAT).getValue();
final String maximumFileSize = context.getProperty(MAXIMUM_FILE_SIZE).evaluateAttributeExpressions(flowFile).getValue();
Table table;
try {
table = loadTable(context);
table = loadTable(context, flowFile);
} catch (Exception e) {
getLogger().error("Failed to load table from catalog", e);
session.transfer(session.penalize(flowFile), REL_FAILURE);
@ -255,7 +263,7 @@ public class PutIceberg extends AbstractIcebergProcessor {
}
final WriteResult result = taskWriter.complete();
appendDataFiles(context, table, result);
appendDataFiles(context, flowFile, table, result);
} catch (Exception e) {
getLogger().error("Exception occurred while writing iceberg records. Removing uncommitted data files", e);
try {
@ -280,10 +288,10 @@ public class PutIceberg extends AbstractIcebergProcessor {
* @param context holds the user provided information for the {@link Catalog} and the {@link Table}
* @return loaded table
*/
private Table loadTable(PropertyContext context) {
private Table loadTable(final PropertyContext context, final FlowFile flowFile) {
final IcebergCatalogService catalogService = context.getProperty(CATALOG).asControllerService(IcebergCatalogService.class);
final String catalogNamespace = context.getProperty(CATALOG_NAMESPACE).evaluateAttributeExpressions().getValue();
final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions().getValue();
final String catalogNamespace = context.getProperty(CATALOG_NAMESPACE).evaluateAttributeExpressions(flowFile).getValue();
final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
final Catalog catalog = catalogService.getCatalog();
@ -300,11 +308,11 @@ public class PutIceberg extends AbstractIcebergProcessor {
* @param table table to append
* @param result datafiles created by the {@link TaskWriter}
*/
void appendDataFiles(ProcessContext context, Table table, WriteResult result) {
final int numberOfCommitRetries = context.getProperty(NUMBER_OF_COMMIT_RETRIES).evaluateAttributeExpressions().asInteger();
final long minimumCommitWaitTime = context.getProperty(MINIMUM_COMMIT_WAIT_TIME).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS);
final long maximumCommitWaitTime = context.getProperty(MAXIMUM_COMMIT_WAIT_TIME).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS);
final long maximumCommitDuration = context.getProperty(MAXIMUM_COMMIT_DURATION).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS);
void appendDataFiles(ProcessContext context, FlowFile flowFile, Table table, WriteResult result) {
final int numberOfCommitRetries = context.getProperty(NUMBER_OF_COMMIT_RETRIES).evaluateAttributeExpressions(flowFile).asInteger();
final long minimumCommitWaitTime = context.getProperty(MINIMUM_COMMIT_WAIT_TIME).evaluateAttributeExpressions(flowFile).asTimePeriod(TimeUnit.MILLISECONDS);
final long maximumCommitWaitTime = context.getProperty(MAXIMUM_COMMIT_WAIT_TIME).evaluateAttributeExpressions(flowFile).asTimePeriod(TimeUnit.MILLISECONDS);
final long maximumCommitDuration = context.getProperty(MAXIMUM_COMMIT_DURATION).evaluateAttributeExpressions(flowFile).asTimePeriod(TimeUnit.MILLISECONDS);
final AppendFiles appender = table.newAppend();
Arrays.stream(result.dataFiles()).forEach(appender::appendFile);

View File

@ -31,6 +31,7 @@ import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.io.TaskWriter;
import org.apache.iceberg.io.WriteResult;
import org.apache.iceberg.types.Types;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processors.iceberg.catalog.TestHadoopCatalogService;
import org.apache.nifi.processors.iceberg.converter.IcebergRecordConverter;
@ -40,6 +41,7 @@ import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.MockPropertyValue;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
@ -129,6 +131,7 @@ public class TestDataFileActions {
when(context.getProperty(MAXIMUM_COMMIT_WAIT_TIME)).thenReturn(new MockPropertyValue("1 ms", null));
when(context.getProperty(MAXIMUM_COMMIT_DURATION)).thenReturn(new MockPropertyValue("1 min", null));
FlowFile mockFlowFile = new MockFlowFile(1234567890L);
AppendFiles appender = Mockito.mock(AppendFiles.class);
doThrow(CommitFailedException.class).when(appender).commit();
@ -136,7 +139,7 @@ public class TestDataFileActions {
when(table.newAppend()).thenReturn(appender);
// assert the commit action eventually fails after exceeding the number of retries
assertThrows(CommitFailedException.class, () -> icebergProcessor.appendDataFiles(context, table, WriteResult.builder().build()));
assertThrows(CommitFailedException.class, () -> icebergProcessor.appendDataFiles(context, mockFlowFile, table, WriteResult.builder().build()));
// verify the commit action was called the configured number of times
verify(appender, times(4)).commit();
@ -151,6 +154,7 @@ public class TestDataFileActions {
when(context.getProperty(MAXIMUM_COMMIT_WAIT_TIME)).thenReturn(new MockPropertyValue("1 ms", null));
when(context.getProperty(MAXIMUM_COMMIT_DURATION)).thenReturn(new MockPropertyValue("1 min", null));
FlowFile mockFlowFile = new MockFlowFile(1234567890L);
AppendFiles appender = Mockito.mock(AppendFiles.class);
// the commit action should throw exception 2 times before succeeding
doThrow(CommitFailedException.class, CommitFailedException.class).doNothing().when(appender).commit();
@ -159,7 +163,7 @@ public class TestDataFileActions {
when(table.newAppend()).thenReturn(appender);
// the method call shouldn't throw exception since the configured number of retries is higher than the number of failed commit actions
icebergProcessor.appendDataFiles(context, table, WriteResult.builder().build());
icebergProcessor.appendDataFiles(context, mockFlowFile, table, WriteResult.builder().build());
// verify the proper number of commit action was called
verify(appender, times(3)).commit();
@ -173,6 +177,7 @@ public class TestDataFileActions {
when(context.getProperty(MAXIMUM_COMMIT_WAIT_TIME)).thenReturn(new MockPropertyValue("2 ms", null));
when(context.getProperty(MAXIMUM_COMMIT_DURATION)).thenReturn(new MockPropertyValue("1 ms", null));
FlowFile mockFlowFile = new MockFlowFile(1234567890L);
AppendFiles appender = Mockito.mock(AppendFiles.class);
doThrow(CommitFailedException.class).when(appender).commit();
@ -180,7 +185,7 @@ public class TestDataFileActions {
when(table.newAppend()).thenReturn(appender);
// assert the commit action eventually fails after exceeding duration of maximum retries
assertThrows(CommitFailedException.class, () -> icebergProcessor.appendDataFiles(context, table, WriteResult.builder().build()));
assertThrows(CommitFailedException.class, () -> icebergProcessor.appendDataFiles(context, mockFlowFile, table, WriteResult.builder().build()));
// verify the commit action was called only 2 times instead of the configured 5
verify(appender, times(2)).commit();

View File

@ -256,10 +256,14 @@ public class TestPutIcebergWithHiveCatalog {
runner = TestRunners.newTestRunner(processor);
initRecordReader();
initCatalog(PartitionSpec.unpartitioned(), fileFormat);
runner.setProperty(PutIceberg.CATALOG_NAMESPACE, CATALOG_NAME);
runner.setProperty(PutIceberg.TABLE_NAME, TABLE_NAME);
runner.setValidateExpressionUsage(false);
runner.enqueue(new byte[0]);
runner.setProperty(PutIceberg.CATALOG_NAMESPACE, "${catalog.name}");
runner.setProperty(PutIceberg.TABLE_NAME, "${table.name}");
runner.setProperty(PutIceberg.MAXIMUM_FILE_SIZE, "${max.filesize}");
Map<String,String> attributes = new HashMap<>();
attributes.put("catalog.name", CATALOG_NAME);
attributes.put("table.name", TABLE_NAME);
attributes.put("max.filesize", "536870912"); // 512 MB
runner.enqueue(new byte[0], attributes);
runner.run();
Table table = catalogService.getCatalog().loadTable(TABLE_IDENTIFIER);