From 61c799d88b53c72cd38fad820284ecd115a6cf1b Mon Sep 17 00:00:00 2001 From: Francois Prunier Date: Fri, 3 Mar 2017 11:41:42 +0100 Subject: [PATCH] NIFI-3204 This closes #1561. fix handling deleting a path with a wildcard when the processor is invoqued via an incoming flowfile applied Joseph Witts patch from NIFI-3204 JIRA Signed-off-by: joewitt --- .../nifi/processors/hadoop/DeleteHDFS.java | 59 +++++++++---------- .../processors/hadoop/TestDeleteHDFS.java | 55 +++++++++-------- 2 files changed, 56 insertions(+), 58 deletions(-) diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java index ed4d10db82..cdabc80cc5 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java @@ -17,7 +17,6 @@ package org.apache.nifi.processors.hadoop; import com.google.common.collect.Lists; -import com.google.common.collect.Maps; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -39,33 +38,38 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; import java.util.List; -import java.util.Map; import java.util.Set; import java.util.regex.Matcher; import java.util.regex.Pattern; +import org.apache.nifi.annotation.documentation.SeeAlso; @TriggerWhenEmpty @InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED) -@Tags({ "hadoop", "HDFS", "delete", "remove", "filesystem", "restricted" }) -@CapabilityDescription("Deletes a file from HDFS. The file can be provided as an attribute from an incoming FlowFile, " - + "or a statically set file that is periodically removed. If this processor has an incoming connection, it" +@Tags({"hadoop", "HDFS", "delete", "remove", "filesystem", "restricted"}) +@CapabilityDescription("Deletes one or more files or directories from HDFS. The path can be provided as an attribute from an incoming FlowFile, " + + "or a statically set path that is periodically removed. If this processor has an incoming connection, it" + "will ignore running on a periodic basis and instead rely on incoming FlowFiles to trigger a delete. " - + "Optionally, you may specify use a wildcard character to match multiple files or directories.") + + "Note that you may use a wildcard character to match multiple files or directories. If there are" + + " no incoming connections no flowfiles will be transfered to any output relationships. If there is an incoming" + + " flowfile then provided there are no detected failures it will be transferred to success otherwise it will be sent to false. If" + + " knowledge of globbed files deleted is necessary use ListHDFS first to produce a specific list of files to delete. ") @Restricted("Provides operator the ability to delete any file that NiFi has access to in HDFS or the local filesystem.") +@SeeAlso({ListHDFS.class}) public class DeleteHDFS extends AbstractHadoopProcessor { + public static final Relationship REL_SUCCESS = new Relationship.Builder() .name("success") - .description("FlowFiles will be routed here if the delete command was successful") + .description("When an incoming flowfile is used then if there are no errors invoking delete the flowfile will route here.") .build(); public static final Relationship REL_FAILURE = new Relationship.Builder() .name("failure") - .description("FlowFiles will be routed here if the delete command was unsuccessful") + .description("When an incoming flowfile is used and there is a failure while deleting then the flowfile will route here.") .build(); public static final PropertyDescriptor FILE_OR_DIRECTORY = new PropertyDescriptor.Builder() .name("file_or_directory") - .displayName("File or Directory") + .displayName("Path") .description("The HDFS file or directory to delete. A wildcard expression may be used to only delete certain files") .required(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) @@ -109,20 +113,20 @@ public class DeleteHDFS extends AbstractHadoopProcessor { @Override public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { - String fileOrDirectoryName = null; - FlowFile flowFile = session.get(); + final FlowFile originalFlowFile = session.get(); // If this processor has an incoming connection, then do not run unless a // FlowFile is actually sent through - if (flowFile == null && context.hasIncomingConnection()) { + if (originalFlowFile == null && context.hasIncomingConnection()) { context.yield(); return; } - if (flowFile != null) { - fileOrDirectoryName = context.getProperty(FILE_OR_DIRECTORY).evaluateAttributeExpressions(flowFile).getValue(); - } else { + final String fileOrDirectoryName; + if (originalFlowFile == null) { fileOrDirectoryName = context.getProperty(FILE_OR_DIRECTORY).evaluateAttributeExpressions().getValue(); + } else { + fileOrDirectoryName = context.getProperty(FILE_OR_DIRECTORY).evaluateAttributeExpressions(originalFlowFile).getValue(); } final FileSystem fileSystem = getFileSystem(); @@ -140,30 +144,21 @@ public class DeleteHDFS extends AbstractHadoopProcessor { pathList.add(new Path(fileOrDirectoryName)); } - Map attributes = Maps.newHashMapWithExpectedSize(2); for (Path path : pathList) { - attributes.put("filename", path.getName()); - attributes.put("path", path.getParent().toString()); if (fileSystem.exists(path)) { fileSystem.delete(path, context.getProperty(RECURSIVE).asBoolean()); - if (!context.hasIncomingConnection()) { - flowFile = session.create(); - } - session.transfer(session.putAllAttributes(flowFile, attributes), REL_SUCCESS); - } else { - getLogger().warn("File (" + path + ") does not exist"); - if (!context.hasIncomingConnection()) { - flowFile = session.create(); - } - session.transfer(session.putAllAttributes(flowFile, attributes), REL_FAILURE); + getLogger().debug("For flowfile {} Deleted file at path {} with name {}", new Object[]{originalFlowFile, path.getParent().toString(), path.getName()}); } } + if (originalFlowFile != null) { + session.transfer(originalFlowFile, DeleteHDFS.REL_SUCCESS); + } } catch (IOException e) { - getLogger().warn("Error processing delete for file or directory", e); - if (flowFile != null) { - session.rollback(true); + if (originalFlowFile != null) { + getLogger().error("Error processing delete for flowfile {} due to {}", new Object[]{originalFlowFile, e.getMessage()}, e); + session.transfer(originalFlowFile, DeleteHDFS.REL_FAILURE); } } - } + } } diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestDeleteHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestDeleteHDFS.java index 0cb371c83f..be16ac63b7 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestDeleteHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestDeleteHDFS.java @@ -16,7 +16,6 @@ */ package org.apache.nifi.processors.hadoop; -import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.any; @@ -25,15 +24,12 @@ import static org.mockito.Mockito.when; import java.io.File; import java.io.IOException; -import java.util.List; import java.util.Map; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.hadoop.KerberosProperties; -import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; @@ -55,6 +51,7 @@ public class TestDeleteHDFS { mockFileSystem = mock(FileSystem.class); } + //Tests the case where a file is found and deleted but there was no incoming connection @Test public void testSuccessfulDelete() throws Exception { Path filePath = new Path("/some/path/to/file.txt"); @@ -66,11 +63,8 @@ public class TestDeleteHDFS { runner.setProperty(DeleteHDFS.FILE_OR_DIRECTORY, filePath.toString()); runner.assertValid(); runner.run(); - runner.assertAllFlowFilesTransferred(DeleteHDFS.REL_SUCCESS); - runner.assertTransferCount(DeleteHDFS.REL_SUCCESS, 1); - FlowFile flowFile = runner.getFlowFilesForRelationship(DeleteHDFS.REL_SUCCESS).get(0); - assertEquals(filePath.getName(), flowFile.getAttribute("filename")); - assertEquals(filePath.getParent().toString(), flowFile.getAttribute("path")); + runner.assertTransferCount(DeleteHDFS.REL_SUCCESS, 0); + runner.assertTransferCount(DeleteHDFS.REL_FAILURE, 0); } @Test @@ -86,9 +80,6 @@ public class TestDeleteHDFS { runner.run(); runner.assertAllFlowFilesTransferred(DeleteHDFS.REL_SUCCESS); runner.assertTransferCount(DeleteHDFS.REL_SUCCESS, 1); - FlowFile flowFile = runner.getFlowFilesForRelationship(DeleteHDFS.REL_SUCCESS).get(0); - assertEquals(filePath.getName(), flowFile.getAttribute("filename")); - assertEquals(filePath.getParent().toString(), flowFile.getAttribute("path")); } @Test @@ -102,9 +93,7 @@ public class TestDeleteHDFS { attributes.put("hdfs.file", filePath.toString()); runner.enqueue("foo", attributes); runner.run(); - runner.assertQueueNotEmpty(); - runner.assertPenalizeCount(1); - assertEquals(1, runner.getQueueSize().getObjectCount()); + runner.assertTransferCount(DeleteHDFS.REL_FAILURE, 1); } @Test @@ -131,11 +120,7 @@ public class TestDeleteHDFS { runner.setProperty(DeleteHDFS.FILE_OR_DIRECTORY, filePath.toString()); runner.assertValid(); runner.run(); - runner.assertAllFlowFilesTransferred(DeleteHDFS.REL_FAILURE); - runner.assertTransferCount(DeleteHDFS.REL_FAILURE, 1); - FlowFile flowFile = runner.getFlowFilesForRelationship(DeleteHDFS.REL_FAILURE).get(0); - assertEquals(filePath.getName(), flowFile.getAttribute("filename")); - assertEquals(filePath.getParent().toString(), flowFile.getAttribute("path")); + runner.assertTransferCount(DeleteHDFS.REL_FAILURE, 0); } @Test @@ -158,14 +143,32 @@ public class TestDeleteHDFS { runner.setProperty(DeleteHDFS.FILE_OR_DIRECTORY, glob.toString()); runner.assertValid(); runner.run(); - runner.assertAllFlowFilesTransferred(DeleteHDFS.REL_SUCCESS); - runner.assertTransferCount(DeleteHDFS.REL_SUCCESS, fileCount); - List flowFiles = runner.getFlowFilesForRelationship(DeleteHDFS.REL_SUCCESS); + runner.assertTransferCount(DeleteHDFS.REL_SUCCESS, 0); + } + + @Test + public void testGlobDeleteFromIncomingFlowFile() throws Exception { + Path glob = new Path("/data/for/2017/08/05/*"); + int fileCount = 300; + FileStatus[] fileStatuses = new FileStatus[fileCount]; for (int i = 0; i < fileCount; i++) { - FlowFile flowFile = flowFiles.get(i); - assertEquals("file" + i, flowFile.getAttribute("filename")); - assertEquals("/data/for/2017/08/05", flowFile.getAttribute("path")); + Path file = new Path("/data/for/2017/08/05/file" + i); + FileStatus fileStatus = mock(FileStatus.class); + when(fileStatus.getPath()).thenReturn(file); + fileStatuses[i] = fileStatus; } + when(mockFileSystem.exists(any(Path.class))).thenReturn(true); + when(mockFileSystem.globStatus(any(Path.class))).thenReturn(fileStatuses); + DeleteHDFS deleteHDFS = new TestableDeleteHDFS(kerberosProperties, mockFileSystem); + TestRunner runner = TestRunners.newTestRunner(deleteHDFS); + runner.setIncomingConnection(true); + Map attributes = Maps.newHashMap(); + runner.enqueue("foo", attributes); + runner.setProperty(DeleteHDFS.FILE_OR_DIRECTORY, glob.toString()); + runner.assertValid(); + runner.run(); + runner.assertAllFlowFilesTransferred(DeleteHDFS.REL_SUCCESS); + runner.assertTransferCount(DeleteHDFS.REL_SUCCESS, 1); } private static class TestableDeleteHDFS extends DeleteHDFS {