NIFI-12130: Ability to configure snapshot properties via dynamic attributes in PutIceberg

Fix dynamic field expression language handling

Signed-off-by: Matt Burgess <mattyb149@apache.org>

This closes #7849
This commit is contained in:
Mark Bathori 2023-10-06 16:02:55 +02:00 committed by Matt Burgess
parent 67bf4434ad
commit 4fa0299f8b
5 changed files with 130 additions and 12 deletions

View File

@ -17,10 +17,15 @@
*/
package org.apache.nifi.processors.iceberg;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class IcebergUtils {
@ -39,4 +44,25 @@ public class IcebergUtils {
}
return conf;
}
/**
* Collects every non-blank dynamic property from the context.
*
* @param context process context
* @param flowFile FlowFile to evaluate attribute expressions
* @return Map of dynamic properties
*/
public static Map<String, String> getDynamicProperties(ProcessContext context, FlowFile flowFile) {
return context.getProperties().entrySet().stream()
// filter non-blank dynamic properties
.filter(e -> e.getKey().isDynamic()
&& StringUtils.isNotBlank(e.getValue())
&& StringUtils.isNotBlank(context.getProperty(e.getKey()).evaluateAttributeExpressions(flowFile).getValue())
)
// convert to Map keys and evaluated property values
.collect(Collectors.toMap(
e -> e.getKey().getName(),
e -> context.getProperty(e.getKey()).evaluateAttributeExpressions(flowFile).getValue()
));
}
}

View File

@ -28,6 +28,7 @@ import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.io.TaskWriter;
import org.apache.iceberg.io.WriteResult;
import org.apache.iceberg.util.Tasks;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
@ -39,6 +40,7 @@ import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.hadoop.SecurityUtil;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
@ -68,6 +70,7 @@ import java.util.concurrent.TimeUnit;
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
import static org.apache.nifi.processors.iceberg.IcebergUtils.getConfigurationFromFiles;
import static org.apache.nifi.processors.iceberg.IcebergUtils.getDynamicProperties;
@Tags({"iceberg", "put", "table", "store", "record", "parse", "orc", "parquet", "avro"})
@CapabilityDescription("This processor uses Iceberg API to parse and load records into Iceberg tables. " +
@ -75,12 +78,19 @@ import static org.apache.nifi.processors.iceberg.IcebergUtils.getConfigurationFr
"The target Iceberg table should already exist and it must have matching schemas with the incoming records, " +
"which means the Record Reader schema must contain all the Iceberg schema fields, every additional field which is not present in the Iceberg schema will be ignored. " +
"To avoid 'small file problem' it is recommended pre-appending a MergeRecord processor.")
@DynamicProperty(
name = "A custom key to add to the snapshot summary. The value must start with 'snapshot-property.' prefix.",
value = "A custom value to add to the snapshot summary.",
expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
description = "Adds an entry with custom-key and corresponding value in the snapshot summary. The key format must be 'snapshot-property.custom-key'.")
@WritesAttributes({
@WritesAttribute(attribute = "iceberg.record.count", description = "The number of records in the FlowFile.")
})
public class PutIceberg extends AbstractIcebergProcessor {
public static final String ICEBERG_RECORD_COUNT = "iceberg.record.count";
public static final String ICEBERG_SNAPSHOT_SUMMARY_PREFIX = "snapshot-property.";
public static final String ICEBERG_SNAPSHOT_SUMMARY_FLOWFILE_UUID = "nifi-flowfile-uuid";
static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
.name("record-reader")
@ -116,6 +126,7 @@ public class PutIceberg extends AbstractIcebergProcessor {
.defaultValue(UnmatchedColumnBehavior.FAIL_UNMATCHED_COLUMN.getValue())
.required(true)
.build();
static final PropertyDescriptor FILE_FORMAT = new PropertyDescriptor.Builder()
.name("file-format")
.displayName("File Format")
@ -211,6 +222,25 @@ public class PutIceberg extends AbstractIcebergProcessor {
return RELATIONSHIPS;
}
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
return new PropertyDescriptor.Builder()
.name(propertyDescriptorName)
.required(false)
.addValidator((subject, input, context) -> {
ValidationResult.Builder builder = new ValidationResult.Builder().subject(subject).input(input);
if (subject.startsWith(ICEBERG_SNAPSHOT_SUMMARY_PREFIX)) {
builder.valid(true);
} else {
builder.valid(false).explanation("Dynamic property key must begin with '" + ICEBERG_SNAPSHOT_SUMMARY_PREFIX + "'");
}
return builder.build();
})
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.dynamic(true)
.build();
}
@Override
protected Collection<ValidationResult> customValidate(ValidationContext context) {
final List<ValidationResult> problems = new ArrayList<>();
@ -322,8 +352,8 @@ public class PutIceberg extends AbstractIcebergProcessor {
* Appends the pending data files to the given {@link Table}.
*
* @param context processor context
* @param table table to append
* @param result datafiles created by the {@link TaskWriter}
* @param table table to append
* @param result datafiles created by the {@link TaskWriter}
*/
void appendDataFiles(ProcessContext context, FlowFile flowFile, Table table, WriteResult result) {
final int numberOfCommitRetries = context.getProperty(NUMBER_OF_COMMIT_RETRIES).evaluateAttributeExpressions(flowFile).asInteger();
@ -334,6 +364,8 @@ public class PutIceberg extends AbstractIcebergProcessor {
final AppendFiles appender = table.newAppend();
Arrays.stream(result.dataFiles()).forEach(appender::appendFile);
addSnapshotSummaryProperties(context, appender, flowFile);
Tasks.foreach(appender)
.exponentialBackoff(minimumCommitWaitTime, maximumCommitWaitTime, maximumCommitDuration, 2.0)
.retry(numberOfCommitRetries)
@ -341,6 +373,22 @@ public class PutIceberg extends AbstractIcebergProcessor {
.run(PendingUpdate::commit);
}
/**
* Adds the FlowFile's uuid and additional entries provided in Dynamic properties to the snapshot summary.
*
* @param context processor context
* @param appender table appender to set the snapshot summaries
* @param flowFile the FlowFile to get the uuid from
*/
private void addSnapshotSummaryProperties(ProcessContext context, AppendFiles appender, FlowFile flowFile) {
appender.set(ICEBERG_SNAPSHOT_SUMMARY_FLOWFILE_UUID, flowFile.getAttribute(CoreAttributes.UUID.key()));
for (Map.Entry<String, String> dynamicProperty : getDynamicProperties(context, flowFile).entrySet()) {
String key = dynamicProperty.getKey().substring(ICEBERG_SNAPSHOT_SUMMARY_PREFIX.length());
appender.set(key, dynamicProperty.getValue());
}
}
/**
* Determines the write file format from the requested value and the table configuration.
*

View File

@ -54,5 +54,12 @@
The NiFi side retry logic is built on top of the Iceberg commit retry logic which can be configured through table properties. See more: <a href="https://iceberg.apache.org/docs/latest/configuration/#table-behavior-properties">Table behavior properties</a>
</p>
<h3>Snapshot summary properties</h3>
<p>
The processor provides an option to add additional properties to the snapshot summary using dynamic properties.
The additional property must have the 'snapshot-property.' prefix in the dynamic property key but the actual entry will be inserted without it.
Each snapshot automatically gets the FlowFile's uuid in the 'nifi-flowfile-uuid' summary property.
</p>
</body>
</html>

View File

@ -120,4 +120,28 @@ public class TestPutIcebergCustomValidation {
runner.setProperty(PutIceberg.TABLE_NAME, TABLE_NAME);
runner.assertNotValid();
}
@Test
public void testInvalidSnapshotSummaryDynamicProperty() throws InitializationException {
initRecordReader();
initCatalogService(Collections.singletonList("src/test/resources/unsecured-core-site.xml"));
runner.setProperty(PutIceberg.CATALOG_NAMESPACE, CATALOG_NAMESPACE);
runner.setProperty(PutIceberg.TABLE_NAME, TABLE_NAME);
runner.setProperty("invalid.dynamic.property", "test value");
runner.assertNotValid();
}
@Test
public void testValidSnapshotSummaryDynamicProperty() throws InitializationException {
initRecordReader();
initCatalogService(Collections.singletonList("src/test/resources/unsecured-core-site.xml"));
runner.setProperty(PutIceberg.CATALOG_NAMESPACE, CATALOG_NAMESPACE);
runner.setProperty(PutIceberg.TABLE_NAME, TABLE_NAME);
runner.setProperty("snapshot-property.valid-property", "test value");
runner.assertValid();
}
}

View File

@ -43,7 +43,6 @@ import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledOnOs;
@ -54,11 +53,13 @@ import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.apache.nifi.processors.iceberg.PutIceberg.ICEBERG_RECORD_COUNT;
import static org.apache.nifi.processors.iceberg.PutIceberg.ICEBERG_SNAPSHOT_SUMMARY_FLOWFILE_UUID;
import static org.apache.nifi.processors.iceberg.util.IcebergTestUtils.validateData;
import static org.apache.nifi.processors.iceberg.util.IcebergTestUtils.validateNumberOfDataFiles;
import static org.apache.nifi.processors.iceberg.util.IcebergTestUtils.validatePartitionFolders;
@ -171,8 +172,8 @@ public class TestPutIcebergWithHiveCatalog {
MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutIceberg.REL_SUCCESS).get(0);
String tableLocation = new URI(table.location()).getPath();
Assertions.assertTrue(table.spec().isPartitioned());
Assertions.assertEquals("4", flowFile.getAttribute(ICEBERG_RECORD_COUNT));
assertTrue(table.spec().isPartitioned());
assertEquals("4", flowFile.getAttribute(ICEBERG_RECORD_COUNT));
validateData(table, expectedRecords, 0);
validateNumberOfDataFiles(tableLocation, 3);
validatePartitionFolders(tableLocation, Arrays.asList(
@ -208,8 +209,8 @@ public class TestPutIcebergWithHiveCatalog {
MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutIceberg.REL_SUCCESS).get(0);
String tableLocation = new URI(table.location()).getPath();
Assertions.assertTrue(table.spec().isPartitioned());
Assertions.assertEquals("4", flowFile.getAttribute(ICEBERG_RECORD_COUNT));
assertTrue(table.spec().isPartitioned());
assertEquals("4", flowFile.getAttribute(ICEBERG_RECORD_COUNT));
validateData(table, expectedRecords, 0);
validateNumberOfDataFiles(tableLocation, 3);
validatePartitionFolders(tableLocation, Arrays.asList(
@ -246,8 +247,8 @@ public class TestPutIcebergWithHiveCatalog {
MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutIceberg.REL_SUCCESS).get(0);
String tableLocation = new URI(table.location()).getPath();
Assertions.assertTrue(table.spec().isPartitioned());
Assertions.assertEquals("4", flowFile.getAttribute(ICEBERG_RECORD_COUNT));
assertTrue(table.spec().isPartitioned());
assertEquals("4", flowFile.getAttribute(ICEBERG_RECORD_COUNT));
validateData(table, expectedRecords, 0);
validateNumberOfDataFiles(tableLocation, 4);
validatePartitionFolders(tableLocation, Arrays.asList(
@ -267,7 +268,8 @@ public class TestPutIcebergWithHiveCatalog {
runner.setProperty(PutIceberg.CATALOG_NAMESPACE, "${catalog.name}");
runner.setProperty(PutIceberg.TABLE_NAME, "${table.name}");
runner.setProperty(PutIceberg.MAXIMUM_FILE_SIZE, "${max.filesize}");
Map<String,String> attributes = new HashMap<>();
runner.setProperty("snapshot-property.additional-summary-property", "test summary property");
Map<String, String> attributes = new HashMap<>();
attributes.put("catalog.name", CATALOG_NAME);
attributes.put("table.name", TABLE_NAME);
attributes.put("max.filesize", "536870912"); // 512 MB
@ -286,11 +288,12 @@ public class TestPutIcebergWithHiveCatalog {
runner.assertTransferCount(PutIceberg.REL_SUCCESS, 1);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutIceberg.REL_SUCCESS).get(0);
Assertions.assertTrue(table.spec().isUnpartitioned());
Assertions.assertEquals("4", flowFile.getAttribute(ICEBERG_RECORD_COUNT));
assertTrue(table.spec().isUnpartitioned());
assertEquals("4", flowFile.getAttribute(ICEBERG_RECORD_COUNT));
validateData(table, expectedRecords, 0);
validateNumberOfDataFiles(new URI(table.location()).getPath(), 1);
assertProvenanceEvents();
assertSnapshotSummaryProperties(table, Collections.singletonMap("additional-summary-property", "test summary property"));
}
private void assertProvenanceEvents() {
@ -300,4 +303,14 @@ public class TestPutIcebergWithHiveCatalog {
assertEquals(ProvenanceEventType.SEND, sendEvent.getEventType());
assertTrue(sendEvent.getTransitUri().endsWith(CATALOG_NAME + ".db/" + TABLE_NAME));
}
private void assertSnapshotSummaryProperties(Table table, Map<String, String> summaryProperties) {
Map<String, String> snapshotSummary = table.currentSnapshot().summary();
assertTrue(snapshotSummary.containsKey(ICEBERG_SNAPSHOT_SUMMARY_FLOWFILE_UUID));
for (Map.Entry<String, String> entry : summaryProperties.entrySet()) {
assertEquals(snapshotSummary.get(entry.getKey()), entry.getValue());
}
}
}