From 1c1738670ca5f921863572434b6f4cc4daddad38 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Mon, 26 Oct 2015 17:14:29 -0400 Subject: [PATCH] NIFI-10: Updated FetchHDFS, FetchFileTransfer to use new FETCH provenance event --- .../main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java | 2 +- .../org/apache/nifi/processors/standard/FetchFileTransfer.java | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java index aa03e73b5b..c27ade9a7a 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java @@ -111,7 +111,7 @@ public class FetchHDFS extends AbstractHadoopProcessor { flowFile = session.importFrom(inStream, flowFile); stopWatch.stop(); getLogger().info("Successfully received content from {} for {} in {}", new Object[] {uri, flowFile, stopWatch.getDuration()}); - session.getProvenanceReporter().modifyContent(flowFile, "Fetched content from " + uri, stopWatch.getDuration(TimeUnit.MILLISECONDS)); + session.getProvenanceReporter().fetch(flowFile, uri.toString(), stopWatch.getDuration(TimeUnit.MILLISECONDS)); session.transfer(flowFile, REL_SUCCESS); } catch (final FileNotFoundException | AccessControlException e) { getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to failure", new Object[] {uri, flowFile, e}); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java index a405afb60d..f3fa347488 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java @@ -278,8 +278,7 @@ public abstract class FetchFileTransfer extends AbstractProcessor { flowFile = session.putAllAttributes(flowFile, attributes); // emit provenance event and transfer FlowFile - session.getProvenanceReporter().modifyContent(flowFile, "Content replaced with content from " + protocolName + "://" + host + ":" + port + "/" + filename, - stopWatch.getElapsed(TimeUnit.MILLISECONDS)); + session.getProvenanceReporter().fetch(flowFile, protocolName + "://" + host + ":" + port + "/" + filename, stopWatch.getElapsed(TimeUnit.MILLISECONDS)); session.transfer(flowFile, REL_SUCCESS); // it is critical that we commit the session before moving/deleting the remote file. Otherwise, we could have a situation where