From 5f65b2561a25a10289a1a10c5325042ed988b105 Mon Sep 17 00:00:00 2001 From: Jeremy Dyer Date: Tue, 14 Mar 2017 14:07:54 -0400 Subject: [PATCH] NIFI-3600 Improve logging and relationship routing for failures in DeleteHDFS realized that the session should be cloned here because its inside a for loop and the original flow file would be transferred but not be the latest flow file if an error occurred in the for loop @trixpan at a high level what do you think about this approach? NIFI-3600: Added unit test NIFI-3600: Removed the hdfs.error.code attribute Signed-off-by: Matt Burgess This closes #1595 --- .../nifi/processors/hadoop/DeleteHDFS.java | 50 ++++++++++++++----- .../processors/hadoop/TestDeleteHDFS.java | 21 ++++++++ 2 files changed, 58 insertions(+), 13 deletions(-) diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java index cdabc80cc5..197228a08f 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java @@ -16,14 +16,26 @@ */ package org.apache.nifi.processors.hadoop; -import com.google.common.collect.Lists; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.Restricted; import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.flowfile.FlowFile; @@ -33,15 +45,8 @@ import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.regex.Matcher; -import java.util.regex.Pattern; -import org.apache.nifi.annotation.documentation.SeeAlso; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; @TriggerWhenEmpty @InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED) @@ -54,6 +59,11 @@ import org.apache.nifi.annotation.documentation.SeeAlso; + " flowfile then provided there are no detected failures it will be transferred to success otherwise it will be sent to false. If" + " knowledge of globbed files deleted is necessary use ListHDFS first to produce a specific list of files to delete. ") @Restricted("Provides operator the ability to delete any file that NiFi has access to in HDFS or the local filesystem.") +@WritesAttributes({ + @WritesAttribute(attribute="hdfs.filename", description="HDFS file to be deleted"), + @WritesAttribute(attribute="hdfs.path", description="HDFS Path specified in the delete request"), + @WritesAttribute(attribute="hdfs.error.message", description="HDFS error message related to the hdfs.error.code") +}) @SeeAlso({ListHDFS.class}) public class DeleteHDFS extends AbstractHadoopProcessor { @@ -146,8 +156,22 @@ public class DeleteHDFS extends AbstractHadoopProcessor { for (Path path : pathList) { if (fileSystem.exists(path)) { - fileSystem.delete(path, context.getProperty(RECURSIVE).asBoolean()); - getLogger().debug("For flowfile {} Deleted file at path {} with name {}", new Object[]{originalFlowFile, path.getParent().toString(), path.getName()}); + try { + fileSystem.delete(path, context.getProperty(RECURSIVE).asBoolean()); + getLogger().debug("For flowfile {} Deleted file at path {} with name {}", new Object[]{originalFlowFile, path.getParent().toString(), path.getName()}); + } catch (IOException ioe) { + // One possible scenario is that the IOException is permissions based, however it would be impractical to check every possible + // external HDFS authorization tool (Ranger, Sentry, etc). Local ACLs could be checked but the operation would be expensive. + getLogger().warn("Failed to delete file or directory", ioe); + + Map attributes = Maps.newHashMapWithExpectedSize(3); + attributes.put("hdfs.filename", path.getName()); + attributes.put("hdfs.path", path.getParent().toString()); + // The error message is helpful in understanding at a flowfile level what caused the IOException (which ACL is denying the operation, e.g.) + attributes.put("hdfs.error.message", ioe.getMessage()); + + session.transfer(session.putAllAttributes(session.clone(originalFlowFile), attributes), REL_FAILURE); + } } } if (originalFlowFile != null) { @@ -161,4 +185,4 @@ public class DeleteHDFS extends AbstractHadoopProcessor { } } -} +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestDeleteHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestDeleteHDFS.java index be16ac63b7..b77b71aca6 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestDeleteHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestDeleteHDFS.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.processors.hadoop; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.any; @@ -30,6 +31,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.nifi.hadoop.KerberosProperties; +import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; @@ -96,6 +98,25 @@ public class TestDeleteHDFS { runner.assertTransferCount(DeleteHDFS.REL_FAILURE, 1); } + @Test + public void testPermissionIOException() throws Exception { + Path filePath = new Path("/some/path/to/file.txt"); + when(mockFileSystem.exists(any(Path.class))).thenReturn(true); + when(mockFileSystem.delete(any(Path.class), any(Boolean.class))).thenThrow(new IOException("Permissions Error")); + DeleteHDFS deleteHDFS = new TestableDeleteHDFS(kerberosProperties, mockFileSystem); + TestRunner runner = TestRunners.newTestRunner(deleteHDFS); + runner.setProperty(DeleteHDFS.FILE_OR_DIRECTORY, "${hdfs.file}"); + Map attributes = Maps.newHashMap(); + attributes.put("hdfs.file", filePath.toString()); + runner.enqueue("foo", attributes); + runner.run(); + runner.assertTransferCount(DeleteHDFS.REL_FAILURE, 1); + MockFlowFile flowFile = runner.getFlowFilesForRelationship(DeleteHDFS.REL_FAILURE).get(0); + assertEquals("file.txt", flowFile.getAttribute("hdfs.filename")); + assertEquals("/some/path/to", flowFile.getAttribute("hdfs.path")); + assertEquals("Permissions Error", flowFile.getAttribute("hdfs.error.message")); + } + @Test public void testNoFlowFilesWithIncomingConnection() throws Exception { Path filePath = new Path("${hdfs.file}");