mirror of https://github.com/apache/nifi.git
NIFI-10: Updated FetchHDFS, FetchFileTransfer to use new FETCH provenance event
This commit is contained in:
parent
17006335e5
commit
1c1738670c
|
@ -111,7 +111,7 @@ public class FetchHDFS extends AbstractHadoopProcessor {
|
||||||
flowFile = session.importFrom(inStream, flowFile);
|
flowFile = session.importFrom(inStream, flowFile);
|
||||||
stopWatch.stop();
|
stopWatch.stop();
|
||||||
getLogger().info("Successfully received content from {} for {} in {}", new Object[] {uri, flowFile, stopWatch.getDuration()});
|
getLogger().info("Successfully received content from {} for {} in {}", new Object[] {uri, flowFile, stopWatch.getDuration()});
|
||||||
session.getProvenanceReporter().modifyContent(flowFile, "Fetched content from " + uri, stopWatch.getDuration(TimeUnit.MILLISECONDS));
|
session.getProvenanceReporter().fetch(flowFile, uri.toString(), stopWatch.getDuration(TimeUnit.MILLISECONDS));
|
||||||
session.transfer(flowFile, REL_SUCCESS);
|
session.transfer(flowFile, REL_SUCCESS);
|
||||||
} catch (final FileNotFoundException | AccessControlException e) {
|
} catch (final FileNotFoundException | AccessControlException e) {
|
||||||
getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to failure", new Object[] {uri, flowFile, e});
|
getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to failure", new Object[] {uri, flowFile, e});
|
||||||
|
|
|
@ -278,8 +278,7 @@ public abstract class FetchFileTransfer extends AbstractProcessor {
|
||||||
flowFile = session.putAllAttributes(flowFile, attributes);
|
flowFile = session.putAllAttributes(flowFile, attributes);
|
||||||
|
|
||||||
// emit provenance event and transfer FlowFile
|
// emit provenance event and transfer FlowFile
|
||||||
session.getProvenanceReporter().modifyContent(flowFile, "Content replaced with content from " + protocolName + "://" + host + ":" + port + "/" + filename,
|
session.getProvenanceReporter().fetch(flowFile, protocolName + "://" + host + ":" + port + "/" + filename, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
|
||||||
stopWatch.getElapsed(TimeUnit.MILLISECONDS));
|
|
||||||
session.transfer(flowFile, REL_SUCCESS);
|
session.transfer(flowFile, REL_SUCCESS);
|
||||||
|
|
||||||
// it is critical that we commit the session before moving/deleting the remote file. Otherwise, we could have a situation where
|
// it is critical that we commit the session before moving/deleting the remote file. Otherwise, we could have a situation where
|
||||||
|
|
Loading…
Reference in New Issue