NIFI-12054: PutIceberg should produce a provenance send event

This closes #7690.

Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>
This commit is contained in:
Mark Bathori 2023-09-14 14:01:36 +02:00 committed by Peter Turcsanyi
parent 4d4c97d091
commit ea4c2055d6
No known key found for this signature in database
GPG Key ID: 55A813F1C3E553DC
2 changed files with 19 additions and 0 deletions

View File

@ -234,6 +234,7 @@ public class PutIceberg extends AbstractIcebergProcessor {
@Override @Override
public void doOnTrigger(ProcessContext context, ProcessSession session, FlowFile flowFile) throws ProcessException { public void doOnTrigger(ProcessContext context, ProcessSession session, FlowFile flowFile) throws ProcessException {
final long startNanos = System.nanoTime();
final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class); final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
final String fileFormat = context.getProperty(FILE_FORMAT).getValue(); final String fileFormat = context.getProperty(FILE_FORMAT).getValue();
final String maximumFileSize = context.getProperty(MAXIMUM_FILE_SIZE).evaluateAttributeExpressions(flowFile).getValue(); final String maximumFileSize = context.getProperty(MAXIMUM_FILE_SIZE).evaluateAttributeExpressions(flowFile).getValue();
@ -281,6 +282,8 @@ public class PutIceberg extends AbstractIcebergProcessor {
} }
flowFile = session.putAttribute(flowFile, ICEBERG_RECORD_COUNT, String.valueOf(recordCount)); flowFile = session.putAttribute(flowFile, ICEBERG_RECORD_COUNT, String.valueOf(recordCount));
final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
session.getProvenanceReporter().send(flowFile, table.location(), transferMillis);
session.transfer(flowFile, REL_SUCCESS); session.transfer(flowFile, REL_SUCCESS);
} }

View File

@ -32,6 +32,8 @@ import org.apache.nifi.hive.metastore.ThriftMetastore;
import org.apache.nifi.processors.iceberg.catalog.IcebergCatalogFactory; import org.apache.nifi.processors.iceberg.catalog.IcebergCatalogFactory;
import org.apache.nifi.processors.iceberg.catalog.TestHiveCatalogService; import org.apache.nifi.processors.iceberg.catalog.TestHiveCatalogService;
import org.apache.nifi.processors.iceberg.util.IcebergTestUtils; import org.apache.nifi.processors.iceberg.util.IcebergTestUtils;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.serialization.record.MockRecordParser; import org.apache.nifi.serialization.record.MockRecordParser;
import org.apache.nifi.serialization.record.RecordField; import org.apache.nifi.serialization.record.RecordField;
@ -60,6 +62,8 @@ import static org.apache.nifi.processors.iceberg.PutIceberg.ICEBERG_RECORD_COUNT
import static org.apache.nifi.processors.iceberg.util.IcebergTestUtils.validateData; import static org.apache.nifi.processors.iceberg.util.IcebergTestUtils.validateData;
import static org.apache.nifi.processors.iceberg.util.IcebergTestUtils.validateNumberOfDataFiles; import static org.apache.nifi.processors.iceberg.util.IcebergTestUtils.validateNumberOfDataFiles;
import static org.apache.nifi.processors.iceberg.util.IcebergTestUtils.validatePartitionFolders; import static org.apache.nifi.processors.iceberg.util.IcebergTestUtils.validatePartitionFolders;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.condition.OS.WINDOWS; import static org.junit.jupiter.api.condition.OS.WINDOWS;
@DisabledOnOs(WINDOWS) @DisabledOnOs(WINDOWS)
@ -174,6 +178,7 @@ public class TestPutIcebergWithHiveCatalog {
validateNumberOfDataFiles(tableLocation, 3); validateNumberOfDataFiles(tableLocation, 3);
validatePartitionFolders(tableLocation, Arrays.asList( validatePartitionFolders(tableLocation, Arrays.asList(
"department_bucket=0", "department_bucket=1", "department_bucket=2")); "department_bucket=0", "department_bucket=1", "department_bucket=2"));
assertProvenanceEvents();
} }
@ParameterizedTest @ParameterizedTest
@ -211,6 +216,7 @@ public class TestPutIcebergWithHiveCatalog {
validateNumberOfDataFiles(tableLocation, 3); validateNumberOfDataFiles(tableLocation, 3);
validatePartitionFolders(tableLocation, Arrays.asList( validatePartitionFolders(tableLocation, Arrays.asList(
"department=Finance", "department=Marketing", "department=Sales")); "department=Finance", "department=Marketing", "department=Sales"));
assertProvenanceEvents();
} }
@ParameterizedTest @ParameterizedTest
@ -253,6 +259,7 @@ public class TestPutIcebergWithHiveCatalog {
"name=Joana/department=Sales/", "name=Joana/department=Sales/",
"name=John/department=Finance/" "name=John/department=Finance/"
)); ));
assertProvenanceEvents();
} }
@ParameterizedTest @ParameterizedTest
@ -287,5 +294,14 @@ public class TestPutIcebergWithHiveCatalog {
Assertions.assertEquals("4", flowFile.getAttribute(ICEBERG_RECORD_COUNT)); Assertions.assertEquals("4", flowFile.getAttribute(ICEBERG_RECORD_COUNT));
validateData(table, expectedRecords, 0); validateData(table, expectedRecords, 0);
validateNumberOfDataFiles(new URI(table.location()).getPath(), 1); validateNumberOfDataFiles(new URI(table.location()).getPath(), 1);
assertProvenanceEvents();
}
private void assertProvenanceEvents() {
final List<ProvenanceEventRecord> provenanceEvents = runner.getProvenanceEvents();
assertEquals(1, provenanceEvents.size());
final ProvenanceEventRecord sendEvent = provenanceEvents.get(0);
assertEquals(ProvenanceEventType.SEND, sendEvent.getEventType());
assertTrue(sendEvent.getTransitUri().endsWith(CATALOG_NAME + ".db/" + TABLE_NAME));
} }
} }