NIFI-4548: Add REMOTE_INVOCATION provenance event type

This commit includes changes to DeleteHDFS to report REMOTE_INVOCATION
event. In order to do so, the processor had to be changed to create
output FlowFile because a provenance event needs a FlowFile it
associates with.

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #2234.
This commit is contained in:
Koji Kawamura 2017-10-30 10:10:22 +09:00 committed by Pierre Villard
parent 116c846342
commit 2a69b35f12
7 changed files with 119 additions and 20 deletions

View File

@ -46,6 +46,13 @@ public enum ProvenanceEventType {
*/ */
SEND, SEND,
/**
* Indicates a provenance event for sending remote invocation request to an external process.
* This event type is used to represent other operations than transferring data (RECEIVE, FETCH or SEND),
* for example, deleting object from an external process or storage.
*/
REMOTE_INVOCATION,
/** /**
* Indicates that the contents of a FlowFile were downloaded by a user or external entity. * Indicates that the contents of a FlowFile were downloaded by a user or external entity.
*/ */

View File

@ -316,6 +316,32 @@ public interface ProvenanceReporter {
*/ */
void send(FlowFile flowFile, String transitUri, String details, long transmissionMillis, boolean force); void send(FlowFile flowFile, String transitUri, String details, long transmissionMillis, boolean force);
/**
* Emits a Provenance Event of type {@link ProvenanceEventType#REMOTE_INVOCATION}
* that indicates a remote invocation is requested to an external endpoint using
* the given FlowFile. The external endpoint may exist in a remote or a local system,
* but is external to NiFi.
* @param flowFile the FlowFile that was used to make the remote invocation
* @param transitUri A URI that provides information about the System and
* Protocol information over which the invocation occurred. The intent of this
* field is to identify they type and target resource or object of the invocation.
*/
void invokeRemoteProcess(FlowFile flowFile, String transitUri);
/**
* Emits a Provenance Event of type {@link ProvenanceEventType#REMOTE_INVOCATION}
* that indicates a remote invocation is requested to an external endpoint using
* the given FlowFile. The external endpoint may exist in a remote or a local system,
* but is external to NiFi.
* @param flowFile the FlowFile that was used to make the remote invocation
* @param transitUri A URI that provides information about the System and
* Protocol information over which the invocation occurred. The intent of this
* field is to identify they type and target resource or object of the invocation.
* @param details additional details related to the REMOTE_INVOCATION event, such as an
* explanation of the invoked process.
*/
void invokeRemoteProcess(FlowFile flowFile, String transitUri, String details);
/** /**
* Emits a Provenance Event of type * Emits a Provenance Event of type
* {@link ProvenanceEventType#ADDINFO ADDINFO} that provides a linkage * {@link ProvenanceEventType#ADDINFO ADDINFO} that provides a linkage

View File

@ -231,6 +231,25 @@ public class MockProvenanceReporter implements ProvenanceReporter {
send(flowFile, transitUri, -1L, true); send(flowFile, transitUri, -1L, true);
} }
@Override
public void invokeRemoteProcess(final FlowFile flowFile, final String transitUri) {
invokeRemoteProcess(flowFile, transitUri, null);
}
@Override
public void invokeRemoteProcess(FlowFile flowFile, String transitUri, String details) {
try {
final ProvenanceEventRecord record = build(flowFile, ProvenanceEventType.REMOTE_INVOCATION)
.setTransitUri(transitUri).setDetails(details).build();
events.add(record);
} catch (final Exception e) {
logger.error("Failed to generate Provenance Event due to " + e);
if (logger.isDebugEnabled()) {
logger.error("", e);
}
}
}
@Override @Override
public void associate(final FlowFile flowFile, final String alternateIdentifierNamespace, final String alternateIdentifier) { public void associate(final FlowFile flowFile, final String alternateIdentifierNamespace, final String alternateIdentifier) {
try { try {

View File

@ -229,6 +229,25 @@ public class StandardProvenanceReporter implements ProvenanceReporter {
send(flowFile, transitUri, -1L, true); send(flowFile, transitUri, -1L, true);
} }
@Override
public void invokeRemoteProcess(final FlowFile flowFile, final String transitUri) {
invokeRemoteProcess(flowFile, transitUri, null);
}
@Override
public void invokeRemoteProcess(FlowFile flowFile, String transitUri, String details) {
try {
final ProvenanceEventRecord record = build(flowFile, ProvenanceEventType.REMOTE_INVOCATION)
.setTransitUri(transitUri).setDetails(details).build();
events.add(record);
} catch (final Exception e) {
logger.error("Failed to generate Provenance Event due to " + e);
if (logger.isDebugEnabled()) {
logger.error("", e);
}
}
}
@Override @Override
public void associate(final FlowFile flowFile, final String alternateIdentifierNamespace, final String alternateIdentifier) { public void associate(final FlowFile flowFile, final String alternateIdentifierNamespace, final String alternateIdentifier) {
try { try {

View File

@ -1351,6 +1351,11 @@
formatEventDetail('Transit Uri', event.transitUri); formatEventDetail('Transit Uri', event.transitUri);
} }
// conditionally show REMOTE_INVOCATION details
if (event.eventType === 'REMOTE_INVOCATION') {
formatEventDetail('Transit Uri', event.transitUri);
}
// conditionally show ADDINFO details // conditionally show ADDINFO details
if (event.eventType === 'ADDINFO') { if (event.eventType === 'ADDINFO') {
formatEventDetail('Alternate Identifier Uri', event.alternateIdentifierUri); formatEventDetail('Alternate Identifier Uri', event.alternateIdentifierUri);

View File

@ -60,8 +60,10 @@ import com.google.common.collect.Maps;
+ " knowledge of globbed files deleted is necessary use ListHDFS first to produce a specific list of files to delete. ") + " knowledge of globbed files deleted is necessary use ListHDFS first to produce a specific list of files to delete. ")
@Restricted("Provides operator the ability to delete any file that NiFi has access to in HDFS or the local filesystem.") @Restricted("Provides operator the ability to delete any file that NiFi has access to in HDFS or the local filesystem.")
@WritesAttributes({ @WritesAttributes({
@WritesAttribute(attribute="hdfs.filename", description="HDFS file to be deleted"), @WritesAttribute(attribute="hdfs.filename", description="HDFS file to be deleted. "
@WritesAttribute(attribute="hdfs.path", description="HDFS Path specified in the delete request"), + "If multiple files are deleted, then only the last filename is set."),
@WritesAttribute(attribute="hdfs.path", description="HDFS Path specified in the delete request. "
+ "If multiple paths are deleted, then only the last path is set."),
@WritesAttribute(attribute="hdfs.error.message", description="HDFS error message related to the hdfs.error.code") @WritesAttribute(attribute="hdfs.error.message", description="HDFS error message related to the hdfs.error.code")
}) })
@SeeAlso({ListHDFS.class, PutHDFS.class}) @SeeAlso({ListHDFS.class, PutHDFS.class})
@ -132,12 +134,10 @@ public class DeleteHDFS extends AbstractHadoopProcessor {
return; return;
} }
final String fileOrDirectoryName; // We need a FlowFile to report provenance correctly.
if (originalFlowFile == null) { FlowFile flowFile = originalFlowFile != null ? originalFlowFile : session.create();
fileOrDirectoryName = context.getProperty(FILE_OR_DIRECTORY).evaluateAttributeExpressions().getValue();
} else { final String fileOrDirectoryName = context.getProperty(FILE_OR_DIRECTORY).evaluateAttributeExpressions(flowFile).getValue();
fileOrDirectoryName = context.getProperty(FILE_OR_DIRECTORY).evaluateAttributeExpressions(originalFlowFile).getValue();
}
final FileSystem fileSystem = getFileSystem(); final FileSystem fileSystem = getFileSystem();
try { try {
@ -154,34 +154,43 @@ public class DeleteHDFS extends AbstractHadoopProcessor {
pathList.add(new Path(fileOrDirectoryName)); pathList.add(new Path(fileOrDirectoryName));
} }
int failedPath = 0;
for (Path path : pathList) { for (Path path : pathList) {
if (fileSystem.exists(path)) { if (fileSystem.exists(path)) {
try { try {
Map<String, String> attributes = Maps.newHashMapWithExpectedSize(2);
attributes.put("hdfs.filename", path.getName());
attributes.put("hdfs.path", path.getParent().toString());
flowFile = session.putAllAttributes(flowFile, attributes);
fileSystem.delete(path, context.getProperty(RECURSIVE).asBoolean()); fileSystem.delete(path, context.getProperty(RECURSIVE).asBoolean());
getLogger().debug("For flowfile {} Deleted file at path {} with name {}", new Object[]{originalFlowFile, path.getParent().toString(), path.getName()}); getLogger().debug("For flowfile {} Deleted file at path {} with name {}", new Object[]{originalFlowFile, path.getParent().toString(), path.getName()});
final Path qualifiedPath = path.makeQualified(fileSystem.getUri(), fileSystem.getWorkingDirectory());
session.getProvenanceReporter().invokeRemoteProcess(flowFile, qualifiedPath.toString());
} catch (IOException ioe) { } catch (IOException ioe) {
// One possible scenario is that the IOException is permissions based, however it would be impractical to check every possible // One possible scenario is that the IOException is permissions based, however it would be impractical to check every possible
// external HDFS authorization tool (Ranger, Sentry, etc). Local ACLs could be checked but the operation would be expensive. // external HDFS authorization tool (Ranger, Sentry, etc). Local ACLs could be checked but the operation would be expensive.
getLogger().warn("Failed to delete file or directory", ioe); getLogger().warn("Failed to delete file or directory", ioe);
Map<String, String> attributes = Maps.newHashMapWithExpectedSize(3); Map<String, String> attributes = Maps.newHashMapWithExpectedSize(1);
attributes.put("hdfs.filename", path.getName());
attributes.put("hdfs.path", path.getParent().toString());
// The error message is helpful in understanding at a flowfile level what caused the IOException (which ACL is denying the operation, e.g.) // The error message is helpful in understanding at a flowfile level what caused the IOException (which ACL is denying the operation, e.g.)
attributes.put("hdfs.error.message", ioe.getMessage()); attributes.put("hdfs.error.message", ioe.getMessage());
session.transfer(session.putAllAttributes(session.clone(originalFlowFile), attributes), REL_FAILURE); session.transfer(session.putAllAttributes(session.clone(flowFile), attributes), REL_FAILURE);
failedPath++;
} }
} }
} }
if (originalFlowFile != null) {
session.transfer(originalFlowFile, DeleteHDFS.REL_SUCCESS); if (failedPath == 0) {
session.transfer(flowFile, DeleteHDFS.REL_SUCCESS);
} else {
// If any path has been failed to be deleted, remove the FlowFile as it's been cloned and sent to failure.
session.remove(flowFile);
} }
} catch (IOException e) { } catch (IOException e) {
if (originalFlowFile != null) { getLogger().error("Error processing delete for flowfile {} due to {}", new Object[]{flowFile, e.getMessage()}, e);
getLogger().error("Error processing delete for flowfile {} due to {}", new Object[]{originalFlowFile, e.getMessage()}, e); session.transfer(flowFile, DeleteHDFS.REL_FAILURE);
session.transfer(originalFlowFile, DeleteHDFS.REL_FAILURE);
}
} }
} }

View File

@ -25,12 +25,16 @@ import static org.mockito.Mockito.when;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.net.URI;
import java.util.List;
import java.util.Map; import java.util.Map;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.nifi.hadoop.KerberosProperties; import org.apache.nifi.hadoop.KerberosProperties;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunner;
@ -58,6 +62,7 @@ public class TestDeleteHDFS {
public void testSuccessfulDelete() throws Exception { public void testSuccessfulDelete() throws Exception {
Path filePath = new Path("/some/path/to/file.txt"); Path filePath = new Path("/some/path/to/file.txt");
when(mockFileSystem.exists(any(Path.class))).thenReturn(true); when(mockFileSystem.exists(any(Path.class))).thenReturn(true);
when(mockFileSystem.getUri()).thenReturn(new URI("hdfs://0.example.com:8020"));
DeleteHDFS deleteHDFS = new TestableDeleteHDFS(kerberosProperties, mockFileSystem); DeleteHDFS deleteHDFS = new TestableDeleteHDFS(kerberosProperties, mockFileSystem);
TestRunner runner = TestRunners.newTestRunner(deleteHDFS); TestRunner runner = TestRunners.newTestRunner(deleteHDFS);
runner.setIncomingConnection(false); runner.setIncomingConnection(false);
@ -65,14 +70,21 @@ public class TestDeleteHDFS {
runner.setProperty(DeleteHDFS.FILE_OR_DIRECTORY, filePath.toString()); runner.setProperty(DeleteHDFS.FILE_OR_DIRECTORY, filePath.toString());
runner.assertValid(); runner.assertValid();
runner.run(); runner.run();
runner.assertTransferCount(DeleteHDFS.REL_SUCCESS, 0); // Even if there's no incoming relationship, a FlowFile is created to indicate which path is deleted.
runner.assertTransferCount(DeleteHDFS.REL_SUCCESS, 1);
runner.assertTransferCount(DeleteHDFS.REL_FAILURE, 0); runner.assertTransferCount(DeleteHDFS.REL_FAILURE, 0);
final List<ProvenanceEventRecord> provenanceEvents = runner.getProvenanceEvents();
assertEquals(1, provenanceEvents.size());
assertEquals(ProvenanceEventType.REMOTE_INVOCATION, provenanceEvents.get(0).getEventType());
assertEquals("hdfs://0.example.com:8020/some/path/to/file.txt", provenanceEvents.get(0).getTransitUri());
} }
@Test @Test
public void testDeleteFromIncomingFlowFile() throws Exception { public void testDeleteFromIncomingFlowFile() throws Exception {
Path filePath = new Path("/some/path/to/file.txt"); Path filePath = new Path("/some/path/to/file.txt");
when(mockFileSystem.exists(any(Path.class))).thenReturn(true); when(mockFileSystem.exists(any(Path.class))).thenReturn(true);
when(mockFileSystem.getUri()).thenReturn(new URI("hdfs://0.example.com:8020"));
DeleteHDFS deleteHDFS = new TestableDeleteHDFS(kerberosProperties, mockFileSystem); DeleteHDFS deleteHDFS = new TestableDeleteHDFS(kerberosProperties, mockFileSystem);
TestRunner runner = TestRunners.newTestRunner(deleteHDFS); TestRunner runner = TestRunners.newTestRunner(deleteHDFS);
runner.setProperty(DeleteHDFS.FILE_OR_DIRECTORY, "${hdfs.file}"); runner.setProperty(DeleteHDFS.FILE_OR_DIRECTORY, "${hdfs.file}");
@ -157,6 +169,7 @@ public class TestDeleteHDFS {
} }
when(mockFileSystem.exists(any(Path.class))).thenReturn(true); when(mockFileSystem.exists(any(Path.class))).thenReturn(true);
when(mockFileSystem.globStatus(any(Path.class))).thenReturn(fileStatuses); when(mockFileSystem.globStatus(any(Path.class))).thenReturn(fileStatuses);
when(mockFileSystem.getUri()).thenReturn(new URI("hdfs://0.example.com:8020"));
DeleteHDFS deleteHDFS = new TestableDeleteHDFS(kerberosProperties, mockFileSystem); DeleteHDFS deleteHDFS = new TestableDeleteHDFS(kerberosProperties, mockFileSystem);
TestRunner runner = TestRunners.newTestRunner(deleteHDFS); TestRunner runner = TestRunners.newTestRunner(deleteHDFS);
runner.setIncomingConnection(false); runner.setIncomingConnection(false);
@ -164,7 +177,7 @@ public class TestDeleteHDFS {
runner.setProperty(DeleteHDFS.FILE_OR_DIRECTORY, glob.toString()); runner.setProperty(DeleteHDFS.FILE_OR_DIRECTORY, glob.toString());
runner.assertValid(); runner.assertValid();
runner.run(); runner.run();
runner.assertTransferCount(DeleteHDFS.REL_SUCCESS, 0); runner.assertTransferCount(DeleteHDFS.REL_SUCCESS, 1);
} }
@Test @Test
@ -180,6 +193,7 @@ public class TestDeleteHDFS {
} }
when(mockFileSystem.exists(any(Path.class))).thenReturn(true); when(mockFileSystem.exists(any(Path.class))).thenReturn(true);
when(mockFileSystem.globStatus(any(Path.class))).thenReturn(fileStatuses); when(mockFileSystem.globStatus(any(Path.class))).thenReturn(fileStatuses);
when(mockFileSystem.getUri()).thenReturn(new URI("hdfs://0.example.com:8020"));
DeleteHDFS deleteHDFS = new TestableDeleteHDFS(kerberosProperties, mockFileSystem); DeleteHDFS deleteHDFS = new TestableDeleteHDFS(kerberosProperties, mockFileSystem);
TestRunner runner = TestRunners.newTestRunner(deleteHDFS); TestRunner runner = TestRunners.newTestRunner(deleteHDFS);
runner.setIncomingConnection(true); runner.setIncomingConnection(true);