From 2a69b35f12586da16288e1a6d5e1dd3562490b92 Mon Sep 17 00:00:00 2001 From: Koji Kawamura Date: Mon, 30 Oct 2017 10:10:22 +0900 Subject: [PATCH] NIFI-4548: Add REMOTE_INVOCATION provenance event type This commit includes changes to DeleteHDFS to report REMOTE_INVOCATION event. In order to do so, the processor had to be changed to create output FlowFile because a provenance event needs a FlowFile it associates with. Signed-off-by: Pierre Villard This closes #2234. --- .../nifi/provenance/ProvenanceEventType.java | 7 +++ .../nifi/provenance/ProvenanceReporter.java | 26 +++++++++++ .../nifi/util/MockProvenanceReporter.java | 19 ++++++++ .../StandardProvenanceReporter.java | 19 ++++++++ .../js/nf/provenance/nf-provenance-table.js | 5 +++ .../nifi/processors/hadoop/DeleteHDFS.java | 45 +++++++++++-------- .../processors/hadoop/TestDeleteHDFS.java | 18 +++++++- 7 files changed, 119 insertions(+), 20 deletions(-) diff --git a/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceEventType.java b/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceEventType.java index 756d3620ef..19b13248e7 100644 --- a/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceEventType.java +++ b/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceEventType.java @@ -46,6 +46,13 @@ public enum ProvenanceEventType { */ SEND, + /** + * Indicates a provenance event for sending remote invocation request to an external process. + * This event type is used to represent other operations than transferring data (RECEIVE, FETCH or SEND), + * for example, deleting object from an external process or storage. + */ + REMOTE_INVOCATION, + /** * Indicates that the contents of a FlowFile were downloaded by a user or external entity. */ diff --git a/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceReporter.java b/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceReporter.java index 39eed43ef4..a8f12a1643 100644 --- a/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceReporter.java +++ b/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceReporter.java @@ -316,6 +316,32 @@ public interface ProvenanceReporter { */ void send(FlowFile flowFile, String transitUri, String details, long transmissionMillis, boolean force); + /** + * Emits a Provenance Event of type {@link ProvenanceEventType#REMOTE_INVOCATION} + * that indicates a remote invocation is requested to an external endpoint using + * the given FlowFile. The external endpoint may exist in a remote or a local system, + * but is external to NiFi. + * @param flowFile the FlowFile that was used to make the remote invocation + * @param transitUri A URI that provides information about the System and + * Protocol information over which the invocation occurred. The intent of this + * field is to identify they type and target resource or object of the invocation. + */ + void invokeRemoteProcess(FlowFile flowFile, String transitUri); + + /** + * Emits a Provenance Event of type {@link ProvenanceEventType#REMOTE_INVOCATION} + * that indicates a remote invocation is requested to an external endpoint using + * the given FlowFile. The external endpoint may exist in a remote or a local system, + * but is external to NiFi. + * @param flowFile the FlowFile that was used to make the remote invocation + * @param transitUri A URI that provides information about the System and + * Protocol information over which the invocation occurred. The intent of this + * field is to identify they type and target resource or object of the invocation. + * @param details additional details related to the REMOTE_INVOCATION event, such as an + * explanation of the invoked process. + */ + void invokeRemoteProcess(FlowFile flowFile, String transitUri, String details); + /** * Emits a Provenance Event of type * {@link ProvenanceEventType#ADDINFO ADDINFO} that provides a linkage diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockProvenanceReporter.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockProvenanceReporter.java index 8dc06f13b2..37a6393e80 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/MockProvenanceReporter.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockProvenanceReporter.java @@ -231,6 +231,25 @@ public class MockProvenanceReporter implements ProvenanceReporter { send(flowFile, transitUri, -1L, true); } + @Override + public void invokeRemoteProcess(final FlowFile flowFile, final String transitUri) { + invokeRemoteProcess(flowFile, transitUri, null); + } + + @Override + public void invokeRemoteProcess(FlowFile flowFile, String transitUri, String details) { + try { + final ProvenanceEventRecord record = build(flowFile, ProvenanceEventType.REMOTE_INVOCATION) + .setTransitUri(transitUri).setDetails(details).build(); + events.add(record); + } catch (final Exception e) { + logger.error("Failed to generate Provenance Event due to " + e); + if (logger.isDebugEnabled()) { + logger.error("", e); + } + } + } + @Override public void associate(final FlowFile flowFile, final String alternateIdentifierNamespace, final String alternateIdentifier) { try { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProvenanceReporter.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProvenanceReporter.java index 2b600137ad..7d14ee39a0 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProvenanceReporter.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProvenanceReporter.java @@ -229,6 +229,25 @@ public class StandardProvenanceReporter implements ProvenanceReporter { send(flowFile, transitUri, -1L, true); } + @Override + public void invokeRemoteProcess(final FlowFile flowFile, final String transitUri) { + invokeRemoteProcess(flowFile, transitUri, null); + } + + @Override + public void invokeRemoteProcess(FlowFile flowFile, String transitUri, String details) { + try { + final ProvenanceEventRecord record = build(flowFile, ProvenanceEventType.REMOTE_INVOCATION) + .setTransitUri(transitUri).setDetails(details).build(); + events.add(record); + } catch (final Exception e) { + logger.error("Failed to generate Provenance Event due to " + e); + if (logger.isDebugEnabled()) { + logger.error("", e); + } + } + } + @Override public void associate(final FlowFile flowFile, final String alternateIdentifierNamespace, final String alternateIdentifier) { try { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/provenance/nf-provenance-table.js b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/provenance/nf-provenance-table.js index 1076ccaf3a..5a426e5a55 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/provenance/nf-provenance-table.js +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/provenance/nf-provenance-table.js @@ -1351,6 +1351,11 @@ formatEventDetail('Transit Uri', event.transitUri); } + // conditionally show REMOTE_INVOCATION details + if (event.eventType === 'REMOTE_INVOCATION') { + formatEventDetail('Transit Uri', event.transitUri); + } + // conditionally show ADDINFO details if (event.eventType === 'ADDINFO') { formatEventDetail('Alternate Identifier Uri', event.alternateIdentifierUri); diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java index 0fe9dc44a7..c75be68aad 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java @@ -60,8 +60,10 @@ import com.google.common.collect.Maps; + " knowledge of globbed files deleted is necessary use ListHDFS first to produce a specific list of files to delete. ") @Restricted("Provides operator the ability to delete any file that NiFi has access to in HDFS or the local filesystem.") @WritesAttributes({ - @WritesAttribute(attribute="hdfs.filename", description="HDFS file to be deleted"), - @WritesAttribute(attribute="hdfs.path", description="HDFS Path specified in the delete request"), + @WritesAttribute(attribute="hdfs.filename", description="HDFS file to be deleted. " + + "If multiple files are deleted, then only the last filename is set."), + @WritesAttribute(attribute="hdfs.path", description="HDFS Path specified in the delete request. " + + "If multiple paths are deleted, then only the last path is set."), @WritesAttribute(attribute="hdfs.error.message", description="HDFS error message related to the hdfs.error.code") }) @SeeAlso({ListHDFS.class, PutHDFS.class}) @@ -132,12 +134,10 @@ public class DeleteHDFS extends AbstractHadoopProcessor { return; } - final String fileOrDirectoryName; - if (originalFlowFile == null) { - fileOrDirectoryName = context.getProperty(FILE_OR_DIRECTORY).evaluateAttributeExpressions().getValue(); - } else { - fileOrDirectoryName = context.getProperty(FILE_OR_DIRECTORY).evaluateAttributeExpressions(originalFlowFile).getValue(); - } + // We need a FlowFile to report provenance correctly. + FlowFile flowFile = originalFlowFile != null ? originalFlowFile : session.create(); + + final String fileOrDirectoryName = context.getProperty(FILE_OR_DIRECTORY).evaluateAttributeExpressions(flowFile).getValue(); final FileSystem fileSystem = getFileSystem(); try { @@ -154,34 +154,43 @@ public class DeleteHDFS extends AbstractHadoopProcessor { pathList.add(new Path(fileOrDirectoryName)); } + int failedPath = 0; for (Path path : pathList) { if (fileSystem.exists(path)) { try { + Map attributes = Maps.newHashMapWithExpectedSize(2); + attributes.put("hdfs.filename", path.getName()); + attributes.put("hdfs.path", path.getParent().toString()); + flowFile = session.putAllAttributes(flowFile, attributes); + fileSystem.delete(path, context.getProperty(RECURSIVE).asBoolean()); getLogger().debug("For flowfile {} Deleted file at path {} with name {}", new Object[]{originalFlowFile, path.getParent().toString(), path.getName()}); + final Path qualifiedPath = path.makeQualified(fileSystem.getUri(), fileSystem.getWorkingDirectory()); + session.getProvenanceReporter().invokeRemoteProcess(flowFile, qualifiedPath.toString()); } catch (IOException ioe) { // One possible scenario is that the IOException is permissions based, however it would be impractical to check every possible // external HDFS authorization tool (Ranger, Sentry, etc). Local ACLs could be checked but the operation would be expensive. getLogger().warn("Failed to delete file or directory", ioe); - Map attributes = Maps.newHashMapWithExpectedSize(3); - attributes.put("hdfs.filename", path.getName()); - attributes.put("hdfs.path", path.getParent().toString()); + Map attributes = Maps.newHashMapWithExpectedSize(1); // The error message is helpful in understanding at a flowfile level what caused the IOException (which ACL is denying the operation, e.g.) attributes.put("hdfs.error.message", ioe.getMessage()); - session.transfer(session.putAllAttributes(session.clone(originalFlowFile), attributes), REL_FAILURE); + session.transfer(session.putAllAttributes(session.clone(flowFile), attributes), REL_FAILURE); + failedPath++; } } } - if (originalFlowFile != null) { - session.transfer(originalFlowFile, DeleteHDFS.REL_SUCCESS); + + if (failedPath == 0) { + session.transfer(flowFile, DeleteHDFS.REL_SUCCESS); + } else { + // If any path has been failed to be deleted, remove the FlowFile as it's been cloned and sent to failure. + session.remove(flowFile); } } catch (IOException e) { - if (originalFlowFile != null) { - getLogger().error("Error processing delete for flowfile {} due to {}", new Object[]{originalFlowFile, e.getMessage()}, e); - session.transfer(originalFlowFile, DeleteHDFS.REL_FAILURE); - } + getLogger().error("Error processing delete for flowfile {} due to {}", new Object[]{flowFile, e.getMessage()}, e); + session.transfer(flowFile, DeleteHDFS.REL_FAILURE); } } diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestDeleteHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestDeleteHDFS.java index b77b71aca6..ed81fec951 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestDeleteHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestDeleteHDFS.java @@ -25,12 +25,16 @@ import static org.mockito.Mockito.when; import java.io.File; import java.io.IOException; +import java.net.URI; +import java.util.List; import java.util.Map; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.nifi.hadoop.KerberosProperties; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceEventType; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.TestRunner; @@ -58,6 +62,7 @@ public class TestDeleteHDFS { public void testSuccessfulDelete() throws Exception { Path filePath = new Path("/some/path/to/file.txt"); when(mockFileSystem.exists(any(Path.class))).thenReturn(true); + when(mockFileSystem.getUri()).thenReturn(new URI("hdfs://0.example.com:8020")); DeleteHDFS deleteHDFS = new TestableDeleteHDFS(kerberosProperties, mockFileSystem); TestRunner runner = TestRunners.newTestRunner(deleteHDFS); runner.setIncomingConnection(false); @@ -65,14 +70,21 @@ public class TestDeleteHDFS { runner.setProperty(DeleteHDFS.FILE_OR_DIRECTORY, filePath.toString()); runner.assertValid(); runner.run(); - runner.assertTransferCount(DeleteHDFS.REL_SUCCESS, 0); + // Even if there's no incoming relationship, a FlowFile is created to indicate which path is deleted. + runner.assertTransferCount(DeleteHDFS.REL_SUCCESS, 1); runner.assertTransferCount(DeleteHDFS.REL_FAILURE, 0); + + final List provenanceEvents = runner.getProvenanceEvents(); + assertEquals(1, provenanceEvents.size()); + assertEquals(ProvenanceEventType.REMOTE_INVOCATION, provenanceEvents.get(0).getEventType()); + assertEquals("hdfs://0.example.com:8020/some/path/to/file.txt", provenanceEvents.get(0).getTransitUri()); } @Test public void testDeleteFromIncomingFlowFile() throws Exception { Path filePath = new Path("/some/path/to/file.txt"); when(mockFileSystem.exists(any(Path.class))).thenReturn(true); + when(mockFileSystem.getUri()).thenReturn(new URI("hdfs://0.example.com:8020")); DeleteHDFS deleteHDFS = new TestableDeleteHDFS(kerberosProperties, mockFileSystem); TestRunner runner = TestRunners.newTestRunner(deleteHDFS); runner.setProperty(DeleteHDFS.FILE_OR_DIRECTORY, "${hdfs.file}"); @@ -157,6 +169,7 @@ public class TestDeleteHDFS { } when(mockFileSystem.exists(any(Path.class))).thenReturn(true); when(mockFileSystem.globStatus(any(Path.class))).thenReturn(fileStatuses); + when(mockFileSystem.getUri()).thenReturn(new URI("hdfs://0.example.com:8020")); DeleteHDFS deleteHDFS = new TestableDeleteHDFS(kerberosProperties, mockFileSystem); TestRunner runner = TestRunners.newTestRunner(deleteHDFS); runner.setIncomingConnection(false); @@ -164,7 +177,7 @@ public class TestDeleteHDFS { runner.setProperty(DeleteHDFS.FILE_OR_DIRECTORY, glob.toString()); runner.assertValid(); runner.run(); - runner.assertTransferCount(DeleteHDFS.REL_SUCCESS, 0); + runner.assertTransferCount(DeleteHDFS.REL_SUCCESS, 1); } @Test @@ -180,6 +193,7 @@ public class TestDeleteHDFS { } when(mockFileSystem.exists(any(Path.class))).thenReturn(true); when(mockFileSystem.globStatus(any(Path.class))).thenReturn(fileStatuses); + when(mockFileSystem.getUri()).thenReturn(new URI("hdfs://0.example.com:8020")); DeleteHDFS deleteHDFS = new TestableDeleteHDFS(kerberosProperties, mockFileSystem); TestRunner runner = TestRunners.newTestRunner(deleteHDFS); runner.setIncomingConnection(true);