diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java index e5142c3345..20f67093ab 100644 --- a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java +++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java @@ -37,6 +37,7 @@ import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.context.PropertyContext; +import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.hadoop.SecurityUtil; import org.apache.nifi.processor.ProcessContext; @@ -91,6 +92,7 @@ public class PutIceberg extends AbstractIcebergProcessor { .name("catalog-namespace") .displayName("Catalog Namespace") .description("The namespace of the catalog.") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .required(true) .addValidator(StandardValidators.NON_BLANK_VALIDATOR) .build(); @@ -98,7 +100,8 @@ public class PutIceberg extends AbstractIcebergProcessor { static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder() .name("table-name") .displayName("Table Name") - .description("The name of the table.") + .description("The name of the Iceberg table to write to.") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .required(true) .addValidator(StandardValidators.NON_BLANK_VALIDATOR) .build(); @@ -119,6 +122,7 @@ public class PutIceberg extends AbstractIcebergProcessor { .displayName("Maximum File Size") .description("The maximum size that a file can be, if the file size is exceeded a new file will be generated with the remaining data." + " If not set, then the 'write.target-file-size-bytes' table property will be used, default value is 512 MB.") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .addValidator(StandardValidators.LONG_VALIDATOR) .build(); @@ -127,6 +131,7 @@ public class PutIceberg extends AbstractIcebergProcessor { .displayName("Number of Commit Retries") .description("Number of times to retry a commit before failing.") .required(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .defaultValue("10") .addValidator(StandardValidators.INTEGER_VALIDATOR) .build(); @@ -136,6 +141,7 @@ public class PutIceberg extends AbstractIcebergProcessor { .displayName("Minimum Commit Wait Time") .description("Minimum time to wait before retrying a commit.") .required(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .defaultValue("100 ms") .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) .build(); @@ -145,6 +151,7 @@ public class PutIceberg extends AbstractIcebergProcessor { .displayName("Maximum Commit Wait Time") .description("Maximum time to wait before retrying a commit.") .required(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .defaultValue("2 sec") .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) .build(); @@ -154,6 +161,7 @@ public class PutIceberg extends AbstractIcebergProcessor { .displayName("Maximum Commit Duration") .description("Total retry timeout period for a commit.") .required(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .defaultValue("30 sec") .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) .build(); @@ -225,13 +233,13 @@ public class PutIceberg extends AbstractIcebergProcessor { @Override public void doOnTrigger(ProcessContext context, ProcessSession session, FlowFile flowFile) throws ProcessException { final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class); - final String fileFormat = context.getProperty(FILE_FORMAT).evaluateAttributeExpressions().getValue(); - final String maximumFileSize = context.getProperty(MAXIMUM_FILE_SIZE).evaluateAttributeExpressions().getValue(); + final String fileFormat = context.getProperty(FILE_FORMAT).getValue(); + final String maximumFileSize = context.getProperty(MAXIMUM_FILE_SIZE).evaluateAttributeExpressions(flowFile).getValue(); Table table; try { - table = loadTable(context); + table = loadTable(context, flowFile); } catch (Exception e) { getLogger().error("Failed to load table from catalog", e); session.transfer(session.penalize(flowFile), REL_FAILURE); @@ -255,7 +263,7 @@ public class PutIceberg extends AbstractIcebergProcessor { } final WriteResult result = taskWriter.complete(); - appendDataFiles(context, table, result); + appendDataFiles(context, flowFile, table, result); } catch (Exception e) { getLogger().error("Exception occurred while writing iceberg records. Removing uncommitted data files", e); try { @@ -280,10 +288,10 @@ public class PutIceberg extends AbstractIcebergProcessor { * @param context holds the user provided information for the {@link Catalog} and the {@link Table} * @return loaded table */ - private Table loadTable(PropertyContext context) { + private Table loadTable(final PropertyContext context, final FlowFile flowFile) { final IcebergCatalogService catalogService = context.getProperty(CATALOG).asControllerService(IcebergCatalogService.class); - final String catalogNamespace = context.getProperty(CATALOG_NAMESPACE).evaluateAttributeExpressions().getValue(); - final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions().getValue(); + final String catalogNamespace = context.getProperty(CATALOG_NAMESPACE).evaluateAttributeExpressions(flowFile).getValue(); + final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue(); final Catalog catalog = catalogService.getCatalog(); @@ -300,11 +308,11 @@ public class PutIceberg extends AbstractIcebergProcessor { * @param table table to append * @param result datafiles created by the {@link TaskWriter} */ - void appendDataFiles(ProcessContext context, Table table, WriteResult result) { - final int numberOfCommitRetries = context.getProperty(NUMBER_OF_COMMIT_RETRIES).evaluateAttributeExpressions().asInteger(); - final long minimumCommitWaitTime = context.getProperty(MINIMUM_COMMIT_WAIT_TIME).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS); - final long maximumCommitWaitTime = context.getProperty(MAXIMUM_COMMIT_WAIT_TIME).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS); - final long maximumCommitDuration = context.getProperty(MAXIMUM_COMMIT_DURATION).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS); + void appendDataFiles(ProcessContext context, FlowFile flowFile, Table table, WriteResult result) { + final int numberOfCommitRetries = context.getProperty(NUMBER_OF_COMMIT_RETRIES).evaluateAttributeExpressions(flowFile).asInteger(); + final long minimumCommitWaitTime = context.getProperty(MINIMUM_COMMIT_WAIT_TIME).evaluateAttributeExpressions(flowFile).asTimePeriod(TimeUnit.MILLISECONDS); + final long maximumCommitWaitTime = context.getProperty(MAXIMUM_COMMIT_WAIT_TIME).evaluateAttributeExpressions(flowFile).asTimePeriod(TimeUnit.MILLISECONDS); + final long maximumCommitDuration = context.getProperty(MAXIMUM_COMMIT_DURATION).evaluateAttributeExpressions(flowFile).asTimePeriod(TimeUnit.MILLISECONDS); final AppendFiles appender = table.newAppend(); Arrays.stream(result.dataFiles()).forEach(appender::appendFile); diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestDataFileActions.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestDataFileActions.java index dc001799f9..ee6b4c0e19 100644 --- a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestDataFileActions.java +++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestDataFileActions.java @@ -31,6 +31,7 @@ import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.io.TaskWriter; import org.apache.iceberg.io.WriteResult; import org.apache.iceberg.types.Types; +import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processors.iceberg.catalog.TestHadoopCatalogService; import org.apache.nifi.processors.iceberg.converter.IcebergRecordConverter; @@ -40,6 +41,7 @@ import org.apache.nifi.serialization.record.MapRecord; import org.apache.nifi.serialization.record.RecordField; import org.apache.nifi.serialization.record.RecordFieldType; import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.MockPropertyValue; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; @@ -129,6 +131,7 @@ public class TestDataFileActions { when(context.getProperty(MAXIMUM_COMMIT_WAIT_TIME)).thenReturn(new MockPropertyValue("1 ms", null)); when(context.getProperty(MAXIMUM_COMMIT_DURATION)).thenReturn(new MockPropertyValue("1 min", null)); + FlowFile mockFlowFile = new MockFlowFile(1234567890L); AppendFiles appender = Mockito.mock(AppendFiles.class); doThrow(CommitFailedException.class).when(appender).commit(); @@ -136,7 +139,7 @@ public class TestDataFileActions { when(table.newAppend()).thenReturn(appender); // assert the commit action eventually fails after exceeding the number of retries - assertThrows(CommitFailedException.class, () -> icebergProcessor.appendDataFiles(context, table, WriteResult.builder().build())); + assertThrows(CommitFailedException.class, () -> icebergProcessor.appendDataFiles(context, mockFlowFile, table, WriteResult.builder().build())); // verify the commit action was called the configured number of times verify(appender, times(4)).commit(); @@ -151,6 +154,7 @@ public class TestDataFileActions { when(context.getProperty(MAXIMUM_COMMIT_WAIT_TIME)).thenReturn(new MockPropertyValue("1 ms", null)); when(context.getProperty(MAXIMUM_COMMIT_DURATION)).thenReturn(new MockPropertyValue("1 min", null)); + FlowFile mockFlowFile = new MockFlowFile(1234567890L); AppendFiles appender = Mockito.mock(AppendFiles.class); // the commit action should throw exception 2 times before succeeding doThrow(CommitFailedException.class, CommitFailedException.class).doNothing().when(appender).commit(); @@ -159,7 +163,7 @@ public class TestDataFileActions { when(table.newAppend()).thenReturn(appender); // the method call shouldn't throw exception since the configured number of retries is higher than the number of failed commit actions - icebergProcessor.appendDataFiles(context, table, WriteResult.builder().build()); + icebergProcessor.appendDataFiles(context, mockFlowFile, table, WriteResult.builder().build()); // verify the proper number of commit action was called verify(appender, times(3)).commit(); @@ -173,6 +177,7 @@ public class TestDataFileActions { when(context.getProperty(MAXIMUM_COMMIT_WAIT_TIME)).thenReturn(new MockPropertyValue("2 ms", null)); when(context.getProperty(MAXIMUM_COMMIT_DURATION)).thenReturn(new MockPropertyValue("1 ms", null)); + FlowFile mockFlowFile = new MockFlowFile(1234567890L); AppendFiles appender = Mockito.mock(AppendFiles.class); doThrow(CommitFailedException.class).when(appender).commit(); @@ -180,7 +185,7 @@ public class TestDataFileActions { when(table.newAppend()).thenReturn(appender); // assert the commit action eventually fails after exceeding duration of maximum retries - assertThrows(CommitFailedException.class, () -> icebergProcessor.appendDataFiles(context, table, WriteResult.builder().build())); + assertThrows(CommitFailedException.class, () -> icebergProcessor.appendDataFiles(context, mockFlowFile, table, WriteResult.builder().build())); // verify the commit action was called only 2 times instead of the configured 5 verify(appender, times(2)).commit(); diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHiveCatalog.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHiveCatalog.java index fb5f5ce41a..3ec70c3d37 100644 --- a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHiveCatalog.java +++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHiveCatalog.java @@ -256,10 +256,14 @@ public class TestPutIcebergWithHiveCatalog { runner = TestRunners.newTestRunner(processor); initRecordReader(); initCatalog(PartitionSpec.unpartitioned(), fileFormat); - runner.setProperty(PutIceberg.CATALOG_NAMESPACE, CATALOG_NAME); - runner.setProperty(PutIceberg.TABLE_NAME, TABLE_NAME); - runner.setValidateExpressionUsage(false); - runner.enqueue(new byte[0]); + runner.setProperty(PutIceberg.CATALOG_NAMESPACE, "${catalog.name}"); + runner.setProperty(PutIceberg.TABLE_NAME, "${table.name}"); + runner.setProperty(PutIceberg.MAXIMUM_FILE_SIZE, "${max.filesize}"); + Map attributes = new HashMap<>(); + attributes.put("catalog.name", CATALOG_NAME); + attributes.put("table.name", TABLE_NAME); + attributes.put("max.filesize", "536870912"); // 512 MB + runner.enqueue(new byte[0], attributes); runner.run(); Table table = catalogService.getCatalog().loadTable(TABLE_IDENTIFIER);