diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java index f7c082bf12..a967b9037a 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java @@ -158,6 +158,7 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor implemen public static final String ABSOLUTE_HDFS_PATH_ATTRIBUTE = "absolute.hdfs.path"; + public static final String HADOOP_FILE_URL_ATTRIBUTE = "hadoop.file.url"; protected static final String TARGET_HDFS_DIR_CREATED_ATTRIBUTE = "target.dir.created"; diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractFetchHDFSRecord.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractFetchHDFSRecord.java index 33e762fa31..03770ac56f 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractFetchHDFSRecord.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractFetchHDFSRecord.java @@ -16,21 +16,6 @@ */ package org.apache.nifi.processors.hadoop; -import java.io.BufferedOutputStream; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.OutputStream; -import java.security.PrivilegedAction; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -57,6 +42,21 @@ import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.util.StopWatch; +import java.io.BufferedOutputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.OutputStream; +import java.security.PrivilegedAction; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + /** * Base processor for reading a data from HDFS that can be fetched into records. */ @@ -234,6 +234,7 @@ public abstract class AbstractFetchHDFSRecord extends AbstractHadoopProcessor { final Path qualifiedPath = path.makeQualified(fileSystem.getUri(), fileSystem.getWorkingDirectory()); + successFlowFile = session.putAttribute(successFlowFile, HADOOP_FILE_URL_ATTRIBUTE, qualifiedPath.toString()); getLogger().info("Successfully received content from {} for {} in {} milliseconds", new Object[] {qualifiedPath, successFlowFile, stopWatch.getDuration()}); session.getProvenanceReporter().fetch(successFlowFile, qualifiedPath.toString(), stopWatch.getDuration(TimeUnit.MILLISECONDS)); session.transfer(successFlowFile, REL_SUCCESS); diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFSRecord.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFSRecord.java index c2fb3bd725..5aef6b4621 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFSRecord.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFSRecord.java @@ -378,6 +378,7 @@ public abstract class AbstractPutHDFSRecord extends AbstractHadoopProcessor { // Send a provenance event and transfer to success final Path qualifiedPath = destFile.makeQualified(fileSystem.getUri(), fileSystem.getWorkingDirectory()); + putFlowFile = session.putAttribute(putFlowFile, HADOOP_FILE_URL_ATTRIBUTE, qualifiedPath.toString()); session.getProvenanceReporter().send(putFlowFile, qualifiedPath.toString()); session.transfer(putFlowFile, REL_SUCCESS); diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java index 2c9285fab5..6b676f9493 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java @@ -72,6 +72,7 @@ import java.util.regex.Pattern; + "If multiple files are deleted, then only the last filename is set."), @WritesAttribute(attribute="hdfs.path", description="HDFS Path specified in the delete request. " + "If multiple paths are deleted, then only the last path is set."), + @WritesAttribute(attribute = "hadoop.file.url", description = "The hadoop url for the file to be deleted."), @WritesAttribute(attribute="hdfs.error.message", description="HDFS error message related to the hdfs.error.code") }) @SeeAlso({ListHDFS.class, PutHDFS.class}) @@ -176,6 +177,7 @@ public class DeleteHDFS extends AbstractHadoopProcessor { fileSystem.delete(path, isRecursive(context, session)); getLogger().debug("For flowfile {} Deleted file at path {} with name {}", new Object[]{originalFlowFile, path.getParent().toString(), path.getName()}); final Path qualifiedPath = path.makeQualified(fileSystem.getUri(), fileSystem.getWorkingDirectory()); + flowFile = session.putAttribute(flowFile, HADOOP_FILE_URL_ATTRIBUTE, qualifiedPath.toString()); session.getProvenanceReporter().invokeRemoteProcess(flowFile, qualifiedPath.toString()); } catch (IOException ioe) { // One possible scenario is that the IOException is permissions based, however it would be impractical to check every possible diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java index e4acaac109..2a4986cadd 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java @@ -31,6 +31,7 @@ import org.apache.nifi.annotation.behavior.Restricted; import org.apache.nifi.annotation.behavior.Restriction; import org.apache.nifi.annotation.behavior.SupportsBatching; import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; @@ -61,8 +62,11 @@ import java.util.concurrent.TimeUnit; @Tags({"hadoop", "hcfs", "hdfs", "get", "ingest", "fetch", "source"}) @CapabilityDescription("Retrieves a file from HDFS. The content of the incoming FlowFile is replaced by the content of the file in HDFS. " + "The file in HDFS is left intact without any changes being made to it.") -@WritesAttribute(attribute="hdfs.failure.reason", description="When a FlowFile is routed to 'failure', this attribute is added indicating why the file could " - + "not be fetched from HDFS") +@WritesAttributes({ + @WritesAttribute(attribute="hdfs.failure.reason", description="When a FlowFile is routed to 'failure', this attribute is added indicating why the file could " + + "not be fetched from HDFS"), + @WritesAttribute(attribute = "hadoop.file.url", description = "The hadoop url for the file is stored in this attribute.") +}) @SeeAlso({ListHDFS.class, GetHDFS.class, PutHDFS.class}) @Restricted(restrictions = { @Restriction( @@ -173,6 +177,7 @@ public class FetchHDFS extends AbstractHadoopProcessor { stopWatch.stop(); getLogger().info("Successfully received content from {} for {} in {}", new Object[] {qualifiedPath, flowFile, stopWatch.getDuration()}); + flowFile = session.putAttribute(flowFile, HADOOP_FILE_URL_ATTRIBUTE, qualifiedPath.toString()); session.getProvenanceReporter().fetch(flowFile, qualifiedPath.toString(), stopWatch.getDuration(TimeUnit.MILLISECONDS)); session.transfer(flowFile, getSuccessRelationship()); } catch (final FileNotFoundException | AccessControlException e) { diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java index a01d236f51..d7a3d13045 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java @@ -75,7 +75,9 @@ import java.util.regex.Pattern; @ReadsAttribute(attribute = "filename", description = "The name of the file written to HDFS comes from the value of this attribute.") @WritesAttributes({ @WritesAttribute(attribute = "filename", description = "The name of the file written to HDFS is stored in this attribute."), - @WritesAttribute(attribute = "absolute.hdfs.path", description = "The absolute path to the file on HDFS is stored in this attribute.")}) + @WritesAttribute(attribute = "absolute.hdfs.path", description = "The absolute path to the file on HDFS is stored in this attribute."), + @WritesAttribute(attribute = "hadoop.file.url", description = "The hadoop url for the file is stored in this attribute.") +}) @SeeAlso({PutHDFS.class, GetHDFS.class}) @Restricted(restrictions = { @Restriction( @@ -426,6 +428,8 @@ public class MoveHDFS extends AbstractHadoopProcessor { final String hdfsPath = newFile.getParent().toString(); flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), newFilename); flowFile = session.putAttribute(flowFile, ABSOLUTE_HDFS_PATH_ATTRIBUTE, hdfsPath); + final Path qualifiedPath = newFile.makeQualified(hdfs.getUri(), hdfs.getWorkingDirectory()); + flowFile = session.putAttribute(flowFile, HADOOP_FILE_URL_ATTRIBUTE, qualifiedPath.toString()); final String transitUri = hdfs.getUri() + StringUtils.prependIfMissing(outputPath, "/"); session.getProvenanceReporter().send(flowFile, transitUri); session.transfer(flowFile, REL_SUCCESS); diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java index eb7dfb56bb..91e91ff7b1 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java @@ -89,6 +89,7 @@ import java.util.stream.Stream; @WritesAttributes({ @WritesAttribute(attribute = "filename", description = "The name of the file written to HDFS is stored in this attribute."), @WritesAttribute(attribute = "absolute.hdfs.path", description = "The absolute path to the file on HDFS is stored in this attribute."), + @WritesAttribute(attribute = "hadoop.file.url", description = "The hadoop url for the file is stored in this attribute."), @WritesAttribute(attribute = "target.dir.created", description = "The result(true/false) indicates if the folder is created by the processor.") }) @SeeAlso(GetHDFS.class) @@ -456,6 +457,7 @@ public class PutHDFS extends AbstractHadoopProcessor { putFlowFile = session.putAttribute(putFlowFile, ABSOLUTE_HDFS_PATH_ATTRIBUTE, hdfsPath); putFlowFile = session.putAttribute(putFlowFile, TARGET_HDFS_DIR_CREATED_ATTRIBUTE, String.valueOf(targetDirCreated)); final Path qualifiedPath = copyFile.makeQualified(hdfs.getUri(), hdfs.getWorkingDirectory()); + putFlowFile = session.putAttribute(putFlowFile, HADOOP_FILE_URL_ATTRIBUTE, qualifiedPath.toString()); session.getProvenanceReporter().send(putFlowFile, qualifiedPath.toString()); session.transfer(putFlowFile, getSuccessRelationship()); diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java index ff603352ee..ab33df270e 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java @@ -233,6 +233,7 @@ public class PutHDFSTest { assertEquals(ProvenanceEventType.SEND, sendEvent.getEventType()); // If it runs with a real HDFS, the protocol will be "hdfs://", but with a local filesystem, just assert the filename. assertTrue(sendEvent.getTransitUri().endsWith(TARGET_DIRECTORY + "/" + FILE_NAME)); + assertTrue(flowFile.getAttribute(PutHDFS.HADOOP_FILE_URL_ATTRIBUTE).endsWith(TARGET_DIRECTORY + "/" + FILE_NAME)); verify(spyFileSystem, times(1)).rename(any(Path.class), any(Path.class)); } @@ -267,6 +268,8 @@ public class PutHDFSTest { assertEquals(FILE_NAME, flowFile.getAttribute(CoreAttributes.FILENAME.key())); assertEquals(TARGET_DIRECTORY, flowFile.getAttribute(PutHDFS.ABSOLUTE_HDFS_PATH_ATTRIBUTE)); assertEquals("true", flowFile.getAttribute(PutHDFS.TARGET_HDFS_DIR_CREATED_ATTRIBUTE)); + // If it runs with a real HDFS, the protocol will be "hdfs://", but with a local filesystem, just assert the filename. + assertTrue(flowFile.getAttribute(PutHDFS.HADOOP_FILE_URL_ATTRIBUTE).endsWith(TARGET_DIRECTORY + "/" + FILE_NAME)); verify(spyFileSystem, Mockito.never()).rename(any(Path.class), any(Path.class)); } @@ -304,6 +307,7 @@ public class PutHDFSTest { assertEquals(ProvenanceEventType.SEND, sendEvent.getEventType()); // If it runs with a real HDFS, the protocol will be "hdfs://", but with a local filesystem, just assert the filename. assertTrue(sendEvent.getTransitUri().endsWith("target/test-classes/randombytes-1")); + assertTrue(flowFile.getAttribute(PutHDFS.HADOOP_FILE_URL_ATTRIBUTE).endsWith("target/test-classes/randombytes-1")); } @Test @@ -330,6 +334,7 @@ public class PutHDFSTest { assertTrue(mockFileSystem.exists(new Path("target/test-classes/randombytes-1.gz"))); assertEquals("randombytes-1.gz", flowFile.getAttribute(CoreAttributes.FILENAME.key())); assertEquals("target/test-classes", flowFile.getAttribute(PutHDFS.ABSOLUTE_HDFS_PATH_ATTRIBUTE)); + assertTrue(flowFile.getAttribute(PutHDFS.HADOOP_FILE_URL_ATTRIBUTE).endsWith(TARGET_DIRECTORY + "/" + FILE_NAME + ".gz")); } @Test @@ -423,6 +428,7 @@ public class PutHDFSTest { assertTrue(mockFileSystem.exists(new Path("target/data_test/randombytes-1"))); assertEquals("randombytes-1", flowFile.getAttribute(CoreAttributes.FILENAME.key())); assertEquals("target/data_test", flowFile.getAttribute(PutHDFS.ABSOLUTE_HDFS_PATH_ATTRIBUTE)); + assertTrue(flowFile.getAttribute(PutHDFS.HADOOP_FILE_URL_ATTRIBUTE).endsWith("target/data_test/" + FILE_NAME)); } @Test diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestDeleteHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestDeleteHDFS.java index e3472bf331..6b64cca767 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestDeleteHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestDeleteHDFS.java @@ -36,6 +36,7 @@ import java.net.URI; import java.util.List; import java.util.Map; +import static org.apache.nifi.processors.hadoop.AbstractHadoopProcessor.HADOOP_FILE_URL_ATTRIBUTE; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -77,6 +78,10 @@ public class TestDeleteHDFS { assertEquals(1, provenanceEvents.size()); assertEquals(ProvenanceEventType.REMOTE_INVOCATION, provenanceEvents.get(0).getEventType()); assertEquals("hdfs://0.example.com:8020/some/path/to/file.txt", provenanceEvents.get(0).getTransitUri()); + + MockFlowFile flowFile = runner.getFlowFilesForRelationship(DeleteHDFS.REL_SUCCESS).get(0); + assertEquals("hdfs://0.example.com:8020/some/path/to/file.txt", flowFile.getAttribute(HADOOP_FILE_URL_ATTRIBUTE)); + } @Test diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestFetchHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestFetchHDFS.java index 4d79c309a7..43f0c3ce6d 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestFetchHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestFetchHDFS.java @@ -34,6 +34,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import static org.apache.nifi.processors.hadoop.AbstractHadoopProcessor.HADOOP_FILE_URL_ATTRIBUTE; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -71,6 +72,9 @@ public class TestFetchHDFS { assertEquals(ProvenanceEventType.FETCH, fetchEvent.getEventType()); // If it runs with a real HDFS, the protocol will be "hdfs://", but with a local filesystem, just assert the filename. assertTrue(fetchEvent.getTransitUri().endsWith(file)); + + MockFlowFile flowFile = runner.getFlowFilesForRelationship(DeleteHDFS.REL_SUCCESS).get(0); + assertTrue(flowFile.getAttribute(HADOOP_FILE_URL_ATTRIBUTE).endsWith(file)); } @Test diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/orc/PutORC.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/orc/PutORC.java index f691038c58..4e5511ac7d 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/orc/PutORC.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/orc/PutORC.java @@ -60,6 +60,7 @@ import java.util.List; @WritesAttributes({ @WritesAttribute(attribute = "filename", description = "The name of the file is stored in this attribute."), @WritesAttribute(attribute = "absolute.hdfs.path", description = "The absolute path to the file is stored in this attribute."), + @WritesAttribute(attribute = "hadoop.file.url", description = "The hadoop url for the file is stored in this attribute."), @WritesAttribute(attribute = "record.count", description = "The number of records written to the ORC file"), @WritesAttribute(attribute = "hive.ddl", description = "Creates a partial Hive DDL statement for creating an external table in Hive from the destination folder. " + "This can be used in ReplaceText for setting the content to the DDL. To make it valid DDL, add \"LOCATION ''\", where " diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/orc/PutORCTest.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/orc/PutORCTest.java index 38338d7c9c..b6390f9fb5 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/orc/PutORCTest.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/orc/PutORCTest.java @@ -80,6 +80,7 @@ import java.util.Map; import java.util.TimeZone; import java.util.function.BiFunction; +import static org.apache.nifi.processors.hadoop.AbstractHadoopProcessor.HADOOP_FILE_URL_ATTRIBUTE; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -191,6 +192,7 @@ public class PutORCTest { mockFlowFile.assertAttributeEquals(PutORC.RECORD_COUNT_ATTR, "100"); mockFlowFile.assertAttributeEquals(PutORC.HIVE_DDL_ATTRIBUTE, "CREATE EXTERNAL TABLE IF NOT EXISTS `myTable` (`name` STRING, `favorite_number` INT, `favorite_color` STRING, `scale` DOUBLE) STORED AS ORC"); + assertTrue(mockFlowFile.getAttribute(HADOOP_FILE_URL_ATTRIBUTE).endsWith(DIRECTORY + "/" + filename)); // verify we generated a provenance event final List provEvents = testRunner.getProvenanceEvents(); @@ -260,6 +262,7 @@ public class PutORCTest { // DDL will be created with field names normalized (lowercased, e.g.) for Hive by default mockFlowFile.assertAttributeEquals(PutORC.HIVE_DDL_ATTRIBUTE, "CREATE EXTERNAL TABLE IF NOT EXISTS `myTable` (`id` INT, `timemillis` INT, `timestampmillis` TIMESTAMP, `dt` DATE, `dec` DECIMAL) STORED AS ORC"); + assertTrue(mockFlowFile.getAttribute(HADOOP_FILE_URL_ATTRIBUTE).endsWith(DIRECTORY + "/" + filename)); // verify we generated a provenance event final List provEvents = testRunner.getProvenanceEvents(); diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/FetchParquet.java b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/FetchParquet.java index 17e99c6132..6d289da8ba 100644 --- a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/FetchParquet.java +++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/FetchParquet.java @@ -48,7 +48,8 @@ import java.io.IOException; @WritesAttributes({ @WritesAttribute(attribute="fetch.failure.reason", description="When a FlowFile is routed to 'failure', this attribute is added " + "indicating why the file could not be fetched from the given filesystem."), - @WritesAttribute(attribute = "record.count", description = "The number of records in the resulting flow file") + @WritesAttribute(attribute = "record.count", description = "The number of records in the resulting flow file"), + @WritesAttribute(attribute = "hadoop.file.url", description = "The hadoop url for the file is stored in this attribute.") }) @SeeAlso({PutParquet.class}) @Restricted(restrictions = { diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/PutParquet.java b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/PutParquet.java index 2d656f2b10..eab970bd0d 100644 --- a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/PutParquet.java +++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/PutParquet.java @@ -66,6 +66,7 @@ import static org.apache.nifi.parquet.utils.ParquetUtils.applyCommonConfig; @WritesAttributes({ @WritesAttribute(attribute = "filename", description = "The name of the file is stored in this attribute."), @WritesAttribute(attribute = "absolute.hdfs.path", description = "The absolute path to the file is stored in this attribute."), + @WritesAttribute(attribute = "hadoop.file.url", description = "The hadoop url for the file is stored in this attribute."), @WritesAttribute(attribute = "record.count", description = "The number of records written to the Parquet file") }) @Restricted(restrictions = { diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/FetchParquetTest.java b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/FetchParquetTest.java index 3bb5475bd9..df412c8fe2 100644 --- a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/FetchParquetTest.java +++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/FetchParquetTest.java @@ -58,6 +58,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import static org.apache.nifi.processors.hadoop.AbstractHadoopProcessor.HADOOP_FILE_URL_ATTRIBUTE; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -131,6 +132,7 @@ public class FetchParquetTest { final MockFlowFile flowFile = testRunner.getFlowFilesForRelationship(FetchParquet.REL_SUCCESS).get(0); flowFile.assertAttributeEquals(FetchParquet.RECORD_COUNT_ATTR, String.valueOf(USERS)); flowFile.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "text/plain"); + assertTrue(flowFile.getAttribute(HADOOP_FILE_URL_ATTRIBUTE).endsWith(DIRECTORY + "/" + parquetFile.getName())); // the mock record writer will write the header for each record so replace those to get down to just the records String flowFileContent = new String(flowFile.toByteArray(), StandardCharsets.UTF_8); diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/PutParquetTest.java b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/PutParquetTest.java index 302e36a5d9..befc898d4d 100644 --- a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/PutParquetTest.java +++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/PutParquetTest.java @@ -16,22 +16,6 @@ */ package org.apache.nifi.processors.parquet; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.when; - -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.nio.charset.StandardCharsets; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.commons.io.IOUtils; @@ -42,10 +26,10 @@ import org.apache.nifi.avro.AvroTypeUtil; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.parquet.utils.ParquetUtils; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processors.hadoop.exception.FailureException; import org.apache.nifi.processors.hadoop.record.HDFSRecordWriter; -import org.apache.nifi.parquet.utils.ParquetUtils; import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceEventType; import org.apache.nifi.reporting.InitializationException; @@ -70,6 +54,23 @@ import org.junit.jupiter.api.condition.DisabledOnOs; import org.junit.jupiter.api.condition.OS; import org.mockito.Mockito; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.nifi.processors.hadoop.AbstractHadoopProcessor.HADOOP_FILE_URL_ATTRIBUTE; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.when; + @DisabledOnOs(OS.WINDOWS) public class PutParquetTest { @@ -135,6 +136,7 @@ public class PutParquetTest { mockFlowFile.assertAttributeEquals(PutParquet.ABSOLUTE_HDFS_PATH_ATTRIBUTE, avroParquetFile.getParent().toString()); mockFlowFile.assertAttributeEquals(CoreAttributes.FILENAME.key(), filename); mockFlowFile.assertAttributeEquals(PutParquet.RECORD_COUNT_ATTR, "100"); + assertTrue(mockFlowFile.getAttribute(HADOOP_FILE_URL_ATTRIBUTE).endsWith(DIRECTORY + "/" + filename)); // verify we generated a provenance event final List provEvents = testRunner.getProvenanceEvents();