From f36eea3701598be05f09e7757d06748bda776ed6 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Mon, 12 Jan 2015 09:09:13 -0500 Subject: [PATCH] NIFI-246: fixed provenance transit uris to include hdfs:// prefix --- .../main/java/org/apache/nifi/processors/hadoop/GetHDFS.java | 3 ++- .../main/java/org/apache/nifi/processors/hadoop/PutHDFS.java | 5 +++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/nar-bundles/hadoop-bundle/hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java b/nar-bundles/hadoop-bundle/hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java index 2cab573515..20ac7381fb 100644 --- a/nar-bundles/hadoop-bundle/hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java +++ b/nar-bundles/hadoop-bundle/hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java @@ -363,7 +363,8 @@ public class GetHDFS extends AbstractHadoopProcessor { continue; } - session.getProvenanceReporter().receive(flowFile, file.toString()); + final String transitUri = (filename.startsWith("/")) ? "hdfs:/" + filename : "hdfs://" + filename; + session.getProvenanceReporter().receive(flowFile, transitUri); session.transfer(flowFile, REL_SUCCESS); getLogger().info("retrieved {} from HDFS {} in {} milliseconds at a rate of {}", new Object[]{flowFile, file, millis, dataRate}); diff --git a/nar-bundles/hadoop-bundle/hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java b/nar-bundles/hadoop-bundle/hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java index 5768da0d87..e84b575416 100644 --- a/nar-bundles/hadoop-bundle/hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java +++ b/nar-bundles/hadoop-bundle/hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java @@ -47,7 +47,6 @@ import org.apache.nifi.processor.io.InputStreamCallback; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.util.StopWatch; import org.apache.nifi.util.Tuple; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; @@ -322,7 +321,9 @@ public class PutHDFS extends AbstractHadoopProcessor { getLogger().info("copied {} to HDFS at {} in {} milliseconds at a rate of {}", new Object[]{flowFile, copyFile, millis, dataRate}); - session.getProvenanceReporter().send(flowFile, copyFile.toString()); + final String filename = copyFile.toString(); + final String transitUri = (filename.startsWith("/")) ? "hdfs:/" + filename : "hdfs://" + filename; + session.getProvenanceReporter().send(flowFile, transitUri); session.transfer(flowFile, REL_SUCCESS); } catch (final Throwable t) {