NIFI-246: fixed provenance transit uris to include hdfs:// prefix

This commit is contained in:
Mark Payne 2015-01-12 09:09:13 -05:00
parent 78c069fb5b
commit f36eea3701
2 changed files with 5 additions and 3 deletions

View File

@ -363,7 +363,8 @@ public class GetHDFS extends AbstractHadoopProcessor {
continue;
}
session.getProvenanceReporter().receive(flowFile, file.toString());
final String transitUri = (filename.startsWith("/")) ? "hdfs:/" + filename : "hdfs://" + filename;
session.getProvenanceReporter().receive(flowFile, transitUri);
session.transfer(flowFile, REL_SUCCESS);
getLogger().info("retrieved {} from HDFS {} in {} milliseconds at a rate of {}",
new Object[]{flowFile, file, millis, dataRate});

View File

@ -47,7 +47,6 @@ import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.StopWatch;
import org.apache.nifi.util.Tuple;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
@ -322,7 +321,9 @@ public class PutHDFS extends AbstractHadoopProcessor {
getLogger().info("copied {} to HDFS at {} in {} milliseconds at a rate of {}",
new Object[]{flowFile, copyFile, millis, dataRate});
session.getProvenanceReporter().send(flowFile, copyFile.toString());
final String filename = copyFile.toString();
final String transitUri = (filename.startsWith("/")) ? "hdfs:/" + filename : "hdfs://" + filename;
session.getProvenanceReporter().send(flowFile, transitUri);
session.transfer(flowFile, REL_SUCCESS);
} catch (final Throwable t) {