From 77a51e1a9ebee2162af48c5bdd5b79703e65b4e2 Mon Sep 17 00:00:00 2001 From: Koji Kawamura Date: Mon, 30 Oct 2017 11:35:13 +0900 Subject: [PATCH] NIFI-4544: Improve HDFS processors provenance transit URL Signed-off-by: Pierre Villard This closes #2238. --- .../processors/hadoop/AbstractFetchHDFSRecord.java | 8 ++++---- .../nifi/processors/hadoop/AbstractPutHDFSRecord.java | 5 ++--- .../org/apache/nifi/processors/hadoop/FetchHDFS.java | 11 +++++------ .../org/apache/nifi/processors/hadoop/GetHDFS.java | 3 +-- .../org/apache/nifi/processors/hadoop/PutHDFS.java | 5 ++--- .../apache/nifi/processors/hadoop/GetHDFSTest.java | 8 ++++++++ .../apache/nifi/processors/hadoop/PutHDFSTest.java | 9 +++++++++ .../apache/nifi/processors/hadoop/TestFetchHDFS.java | 8 ++++++++ .../nifi/processors/parquet/PutParquetTest.java | 3 ++- 9 files changed, 41 insertions(+), 19 deletions(-) diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractFetchHDFSRecord.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractFetchHDFSRecord.java index d6a374f50f..96631ef5d0 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractFetchHDFSRecord.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractFetchHDFSRecord.java @@ -20,7 +20,6 @@ import java.io.BufferedOutputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.io.OutputStream; -import java.net.URI; import java.security.PrivilegedAction; import java.util.ArrayList; import java.util.Collections; @@ -232,9 +231,10 @@ public abstract class AbstractFetchHDFSRecord extends AbstractHadoopProcessor { attributes.put(CoreAttributes.MIME_TYPE.key(), mimeTypeRef.get()); successFlowFile = session.putAllAttributes(successFlowFile, attributes); - final URI uri = path.toUri(); - getLogger().info("Successfully received content from {} for {} in {} milliseconds", new Object[] {uri, successFlowFile, stopWatch.getDuration()}); - session.getProvenanceReporter().fetch(successFlowFile, uri.toString(), stopWatch.getDuration(TimeUnit.MILLISECONDS)); + + final Path qualifiedPath = path.makeQualified(fileSystem.getUri(), fileSystem.getWorkingDirectory()); + getLogger().info("Successfully received content from {} for {} in {} milliseconds", new Object[] {qualifiedPath, successFlowFile, stopWatch.getDuration()}); + session.getProvenanceReporter().fetch(successFlowFile, qualifiedPath.toString(), stopWatch.getDuration(TimeUnit.MILLISECONDS)); session.transfer(successFlowFile, REL_SUCCESS); session.remove(originalFlowFile); return null; diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFSRecord.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFSRecord.java index e08b4fb367..22f61b22f9 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFSRecord.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFSRecord.java @@ -349,7 +349,6 @@ public abstract class AbstractPutHDFSRecord extends AbstractHadoopProcessor { putFlowFile = postProcess(context, session, putFlowFile, destFile); - final String outputPath = destFile.toString(); final String newFilename = destFile.getName(); final String hdfsPath = destFile.getParent().toString(); @@ -361,8 +360,8 @@ public abstract class AbstractPutHDFSRecord extends AbstractHadoopProcessor { putFlowFile = session.putAllAttributes(putFlowFile, attributes); // Send a provenance event and transfer to success - final String transitUri = (outputPath.startsWith("/")) ? "hdfs:/" + outputPath : "hdfs://" + outputPath; - session.getProvenanceReporter().send(putFlowFile, transitUri); + final Path qualifiedPath = destFile.makeQualified(fileSystem.getUri(), fileSystem.getWorkingDirectory()); + session.getProvenanceReporter().send(putFlowFile, qualifiedPath.toString()); session.transfer(putFlowFile, REL_SUCCESS); } catch (IOException | FlowFileAccessException e) { diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java index 08a8ce21cf..4237503aae 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java @@ -46,7 +46,6 @@ import org.apache.nifi.util.StopWatch; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; -import java.net.URI; import java.security.PrivilegedAction; import java.util.ArrayList; import java.util.HashSet; @@ -128,7 +127,6 @@ public class FetchHDFS extends AbstractHadoopProcessor { return; } - final URI uri = path.toUri(); final StopWatch stopWatch = new StopWatch(true); final FlowFile finalFlowFile = flowFile; @@ -149,6 +147,7 @@ public class FetchHDFS extends AbstractHadoopProcessor { } FlowFile flowFile = finalFlowFile; + final Path qualifiedPath = path.makeQualified(hdfs.getUri(), hdfs.getWorkingDirectory()); try { final String outputFilename; final String originalFilename = path.getName(); @@ -166,16 +165,16 @@ public class FetchHDFS extends AbstractHadoopProcessor { flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), outputFilename); stopWatch.stop(); - getLogger().info("Successfully received content from {} for {} in {}", new Object[] {uri, flowFile, stopWatch.getDuration()}); - session.getProvenanceReporter().fetch(flowFile, uri.toString(), stopWatch.getDuration(TimeUnit.MILLISECONDS)); + getLogger().info("Successfully received content from {} for {} in {}", new Object[] {qualifiedPath, flowFile, stopWatch.getDuration()}); + session.getProvenanceReporter().fetch(flowFile, qualifiedPath.toString(), stopWatch.getDuration(TimeUnit.MILLISECONDS)); session.transfer(flowFile, REL_SUCCESS); } catch (final FileNotFoundException | AccessControlException e) { - getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to failure", new Object[] {uri, flowFile, e}); + getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to failure", new Object[] {qualifiedPath, flowFile, e}); flowFile = session.putAttribute(flowFile, "hdfs.failure.reason", e.getMessage()); flowFile = session.penalize(flowFile); session.transfer(flowFile, REL_FAILURE); } catch (final IOException e) { - getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to comms.failure", new Object[] {uri, flowFile, e}); + getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to comms.failure", new Object[] {qualifiedPath, flowFile, e}); flowFile = session.penalize(flowFile); session.transfer(flowFile, REL_COMMS_FAILURE); } finally { diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java index 7357d3542a..64730c84a8 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java @@ -381,8 +381,7 @@ public class GetHDFS extends AbstractHadoopProcessor { continue; } - final String transitUri = (originalFilename.startsWith("/")) ? "hdfs:/" + originalFilename : "hdfs://" + originalFilename; - session.getProvenanceReporter().receive(flowFile, transitUri); + session.getProvenanceReporter().receive(flowFile, file.toString()); session.transfer(flowFile, REL_SUCCESS); getLogger().info("retrieved {} from HDFS {} in {} milliseconds at a rate of {}", new Object[]{flowFile, file, millis, dataRate}); diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java index 4a9b2c1dba..fe627024b7 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java @@ -357,13 +357,12 @@ public class PutHDFS extends AbstractHadoopProcessor { getLogger().info("copied {} to HDFS at {} in {} milliseconds at a rate of {}", new Object[]{putFlowFile, copyFile, millis, dataRate}); - final String outputPath = copyFile.toString(); final String newFilename = copyFile.getName(); final String hdfsPath = copyFile.getParent().toString(); putFlowFile = session.putAttribute(putFlowFile, CoreAttributes.FILENAME.key(), newFilename); putFlowFile = session.putAttribute(putFlowFile, ABSOLUTE_HDFS_PATH_ATTRIBUTE, hdfsPath); - final String transitUri = (outputPath.startsWith("/")) ? "hdfs:/" + outputPath : "hdfs://" + outputPath; - session.getProvenanceReporter().send(putFlowFile, transitUri); + final Path qualifiedPath = copyFile.makeQualified(hdfs.getUri(), hdfs.getWorkingDirectory()); + session.getProvenanceReporter().send(putFlowFile, qualifiedPath.toString()); session.transfer(putFlowFile, REL_SUCCESS); diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSTest.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSTest.java index 325ba3a862..40666d9cc4 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSTest.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSTest.java @@ -21,6 +21,8 @@ import org.apache.nifi.components.ValidationResult; import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.hadoop.KerberosProperties; import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceEventType; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.MockProcessContext; import org.apache.nifi.util.NiFiProperties; @@ -214,6 +216,12 @@ public class GetHDFSTest { assertTrue(flowFile.getAttribute(CoreAttributes.FILENAME.key()).equals("13545423550275052.zip")); InputStream expected = getClass().getResourceAsStream("/testdata/13545423550275052.zip"); flowFile.assertContentEquals(expected); + final List provenanceEvents = runner.getProvenanceEvents(); + assertEquals(1, provenanceEvents.size()); + final ProvenanceEventRecord receiveEvent = provenanceEvents.get(0); + assertEquals(ProvenanceEventType.RECEIVE, receiveEvent.getEventType()); + // If it runs with a real HDFS, the protocol will be "hdfs://", but with a local filesystem, just assert the filename. + assertTrue(receiveEvent.getTransitUri().endsWith("13545423550275052.zip")); } @Test diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java index 2d3ad791b4..32569ac59f 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java @@ -27,6 +27,8 @@ import org.apache.nifi.hadoop.KerberosProperties; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceEventType; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.MockProcessContext; import org.apache.nifi.util.NiFiProperties; @@ -220,6 +222,13 @@ public class PutHDFSTest { assertTrue(fs.exists(new Path("target/test-classes/randombytes-1"))); assertEquals("randombytes-1", flowFile.getAttribute(CoreAttributes.FILENAME.key())); assertEquals("target/test-classes", flowFile.getAttribute(PutHDFS.ABSOLUTE_HDFS_PATH_ATTRIBUTE)); + + final List provenanceEvents = runner.getProvenanceEvents(); + assertEquals(1, provenanceEvents.size()); + final ProvenanceEventRecord sendEvent = provenanceEvents.get(0); + assertEquals(ProvenanceEventType.SEND, sendEvent.getEventType()); + // If it runs with a real HDFS, the protocol will be "hdfs://", but with a local filesystem, just assert the filename. + assertTrue(sendEvent.getTransitUri().endsWith("target/test-classes/randombytes-1")); } @Test diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestFetchHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestFetchHDFS.java index 8b8b568f27..74c48a67f5 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestFetchHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestFetchHDFS.java @@ -18,6 +18,8 @@ package org.apache.nifi.processors.hadoop; import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.hadoop.KerberosProperties; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceEventType; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.TestRunner; @@ -61,6 +63,12 @@ public class TestFetchHDFS { runner.enqueue(new String("trigger flow file")); runner.run(); runner.assertAllFlowFilesTransferred(FetchHDFS.REL_SUCCESS, 1); + final List provenanceEvents = runner.getProvenanceEvents(); + assertEquals(1, provenanceEvents.size()); + final ProvenanceEventRecord fetchEvent = provenanceEvents.get(0); + assertEquals(ProvenanceEventType.FETCH, fetchEvent.getEventType()); + // If it runs with a real HDFS, the protocol will be "hdfs://", but with a local filesystem, just assert the filename. + assertTrue(fetchEvent.getTransitUri().endsWith(file)); } @Test diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/PutParquetTest.java b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/PutParquetTest.java index e634e2e450..9e7943e8aa 100644 --- a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/PutParquetTest.java +++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/PutParquetTest.java @@ -143,7 +143,8 @@ public class PutParquetTest { // verify it was a SEND event with the correct URI final ProvenanceEventRecord provEvent = provEvents.get(0); Assert.assertEquals(ProvenanceEventType.SEND, provEvent.getEventType()); - Assert.assertEquals("hdfs://" + avroParquetFile.toString(), provEvent.getTransitUri()); + // If it runs with a real HDFS, the protocol will be "hdfs://", but with a local filesystem, just assert the filename. + Assert.assertTrue(provEvent.getTransitUri().endsWith(DIRECTORY + "/" + filename)); // verify the content of the parquet file by reading it back in verifyAvroParquetUsers(avroParquetFile, 100);