From 26d362b144e15ea4a224e346c340d74e978c134c Mon Sep 17 00:00:00 2001 From: ricky Date: Wed, 10 Aug 2016 19:14:39 -0400 Subject: [PATCH] NIFI-2547: Add DeleteHDFS Processor This processor adds the capability to delete files or directories inside of HDFS. Paths supports both static and expression language values, as well as glob support (e.g. /data/for/2016/07/*). This processor may be used standalone, as well as part of a downstream connection. Signed-off-by: Matt Burgess Add Glob Matcher with Tests Also set displayName on properties. Signed-off-by: Matt Burgess This closes #850 --- .../nifi/processors/hadoop/DeleteHDFS.java | 168 +++++++++++++++ .../org.apache.nifi.processor.Processor | 1 + .../processors/hadoop/TestDeleteHDFS.java | 198 ++++++++++++++++++ 3 files changed, 367 insertions(+) create mode 100644 nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java create mode 100644 nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestDeleteHDFS.java diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java new file mode 100644 index 0000000000..371b9e1ef0 --- /dev/null +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.hadoop; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +@TriggerWhenEmpty +@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED) +@Tags({ "hadoop", "HDFS", "delete", "remove", "filesystem" }) +@CapabilityDescription("Deletes a file from HDFS. The file can be provided as an attribute from an incoming FlowFile, " + + "or a statically set file that is periodically removed. If this processor has an incoming connection, it" + + "will ignore running on a periodic basis and instead rely on incoming FlowFiles to trigger a delete. " + + "Optionally, you may specify use a wildcard character to match multiple files or directories.") +public class DeleteHDFS extends AbstractHadoopProcessor { + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("FlowFiles will be routed here if the delete command was successful") + .build(); + + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("FlowFiles will be routed here if the delete command was unsuccessful") + .build(); + + public static final PropertyDescriptor FILE_OR_DIRECTORY = new PropertyDescriptor.Builder() + .name("file_or_directory") + .displayName("File or Directory") + .description("The HDFS file or directory to delete. A wildcard expression may be used to only delete certain files") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + + public static final PropertyDescriptor RECURSIVE = new PropertyDescriptor.Builder() + .name("recursive") + .displayName("Recursive") + .description("Remove contents of a non-empty directory recursively") + .allowableValues("true", "false") + .required(true) + .defaultValue("true") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + protected final Pattern GLOB_PATTERN = Pattern.compile("\\[|\\]|\\*|\\?|\\^|\\{|\\}|\\\\c"); + protected final Matcher GLOB_MATCHER = GLOB_PATTERN.matcher(""); + + private static final Set relationships; + + static { + final Set relationshipSet = new HashSet<>(); + relationshipSet.add(REL_SUCCESS); + relationshipSet.add(REL_FAILURE); + relationships = Collections.unmodifiableSet(relationshipSet); + } + + @Override + protected List getSupportedPropertyDescriptors() { + List props = new ArrayList<>(properties); + props.add(FILE_OR_DIRECTORY); + props.add(RECURSIVE); + return props; + } + + @Override + public Set getRelationships() { + return relationships; + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + String fileOrDirectoryName = null; + FlowFile flowFile = session.get(); + + // If this processor has an incoming connection, then do not run unless a + // FlowFile is actually sent through + if (flowFile == null && context.hasIncomingConnection()) { + context.yield(); + return; + } + + if (flowFile != null) { + fileOrDirectoryName = context.getProperty(FILE_OR_DIRECTORY).evaluateAttributeExpressions(flowFile).getValue(); + } else { + fileOrDirectoryName = context.getProperty(FILE_OR_DIRECTORY).evaluateAttributeExpressions().getValue(); + } + + final FileSystem fileSystem = getFileSystem(); + try { + // Check if the user has supplied a file or directory pattern + List pathList = Lists.newArrayList(); + if (GLOB_MATCHER.reset(fileOrDirectoryName).find()) { + FileStatus[] fileStatuses = fileSystem.globStatus(new Path(fileOrDirectoryName)); + if (fileStatuses != null) { + for (FileStatus fileStatus : fileStatuses) { + pathList.add(fileStatus.getPath()); + } + } + } else { + pathList.add(new Path(fileOrDirectoryName)); + } + + Map attributes = Maps.newHashMapWithExpectedSize(2); + for (Path path : pathList) { + attributes.put("filename", path.getName()); + attributes.put("path", path.getParent().toString()); + if (fileSystem.exists(path)) { + fileSystem.delete(path, context.getProperty(RECURSIVE).asBoolean()); + if (!context.hasIncomingConnection()) { + flowFile = session.create(); + } + session.transfer(session.putAllAttributes(flowFile, attributes), REL_SUCCESS); + } else { + getLogger().warn("File (" + path + ") does not exist"); + if (!context.hasIncomingConnection()) { + flowFile = session.create(); + } + session.transfer(session.putAllAttributes(flowFile, attributes), REL_FAILURE); + } + } + } catch (IOException e) { + getLogger().warn("Error processing delete for file or directory", e); + if (flowFile != null) { + session.rollback(true); + } + } + } + +} diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index ef81091291..165ec2c087 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -19,3 +19,4 @@ org.apache.nifi.processors.hadoop.GetHDFSSequenceFile org.apache.nifi.processors.hadoop.inotify.GetHDFSEvents org.apache.nifi.processors.hadoop.ListHDFS org.apache.nifi.processors.hadoop.PutHDFS +org.apache.nifi.processors.hadoop.DeleteHDFS diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestDeleteHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestDeleteHDFS.java new file mode 100644 index 0000000000..89e3be35b8 --- /dev/null +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestDeleteHDFS.java @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.hadoop; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.regex.Matcher; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.hadoop.KerberosProperties; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.NiFiProperties; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Before; +import org.junit.Test; +import static org.junit.Assert.*; + +import com.google.common.collect.Maps; + +public class TestDeleteHDFS { + private NiFiProperties mockNiFiProperties; + private FileSystem mockFileSystem; + private KerberosProperties kerberosProperties; + + @Before + public void setup() throws Exception { + mockNiFiProperties = mock(NiFiProperties.class); + when(mockNiFiProperties.getKerberosConfigurationFile()).thenReturn(null); + kerberosProperties = KerberosProperties.create(mockNiFiProperties); + mockFileSystem = mock(FileSystem.class); + } + + @Test + public void testSuccessfulDelete() throws Exception { + Path filePath = new Path("/some/path/to/file.txt"); + when(mockFileSystem.exists(any(Path.class))).thenReturn(true); + DeleteHDFS deleteHDFS = new TestableDeleteHDFS(kerberosProperties, mockFileSystem); + TestRunner runner = TestRunners.newTestRunner(deleteHDFS); + runner.setIncomingConnection(false); + runner.assertNotValid(); + runner.setProperty(DeleteHDFS.FILE_OR_DIRECTORY, filePath.toString()); + runner.assertValid(); + runner.run(); + runner.assertAllFlowFilesTransferred(DeleteHDFS.REL_SUCCESS); + runner.assertTransferCount(DeleteHDFS.REL_SUCCESS, 1); + FlowFile flowFile = runner.getFlowFilesForRelationship(DeleteHDFS.REL_SUCCESS).get(0); + assertEquals(filePath.getName(), flowFile.getAttribute("filename")); + assertEquals(filePath.getParent().toString(), flowFile.getAttribute("path")); + } + + @Test + public void testDeleteFromIncomingFlowFile() throws Exception { + Path filePath = new Path("/some/path/to/file.txt"); + when(mockFileSystem.exists(any(Path.class))).thenReturn(true); + DeleteHDFS deleteHDFS = new TestableDeleteHDFS(kerberosProperties, mockFileSystem); + TestRunner runner = TestRunners.newTestRunner(deleteHDFS); + runner.setProperty(DeleteHDFS.FILE_OR_DIRECTORY, "${hdfs.file}"); + Map attributes = Maps.newHashMap(); + attributes.put("hdfs.file", filePath.toString()); + runner.enqueue("foo", attributes); + runner.run(); + runner.assertAllFlowFilesTransferred(DeleteHDFS.REL_SUCCESS); + runner.assertTransferCount(DeleteHDFS.REL_SUCCESS, 1); + FlowFile flowFile = runner.getFlowFilesForRelationship(DeleteHDFS.REL_SUCCESS).get(0); + assertEquals(filePath.getName(), flowFile.getAttribute("filename")); + assertEquals(filePath.getParent().toString(), flowFile.getAttribute("path")); + } + + @Test + public void testIOException() throws Exception { + Path filePath = new Path("/some/path/to/file.txt"); + when(mockFileSystem.exists(any(Path.class))).thenThrow(new IOException()); + DeleteHDFS deleteHDFS = new TestableDeleteHDFS(kerberosProperties, mockFileSystem); + TestRunner runner = TestRunners.newTestRunner(deleteHDFS); + runner.setProperty(DeleteHDFS.FILE_OR_DIRECTORY, "${hdfs.file}"); + Map attributes = Maps.newHashMap(); + attributes.put("hdfs.file", filePath.toString()); + runner.enqueue("foo", attributes); + runner.run(); + runner.assertQueueNotEmpty(); + runner.assertPenalizeCount(1); + assertEquals(1, runner.getQueueSize().getObjectCount()); + } + + @Test + public void testNoFlowFilesWithIncomingConnection() throws Exception { + Path filePath = new Path("${hdfs.file}"); + DeleteHDFS deleteHDFS = new TestableDeleteHDFS(kerberosProperties, mockFileSystem); + TestRunner runner = TestRunners.newTestRunner(deleteHDFS); + runner.setProperty(DeleteHDFS.FILE_OR_DIRECTORY, filePath.toString()); + runner.setIncomingConnection(true); + runner.run(); + runner.assertQueueEmpty(); + runner.assertTransferCount(DeleteHDFS.REL_SUCCESS, 0); + runner.assertTransferCount(DeleteHDFS.REL_FAILURE, 0); + } + + @Test + public void testUnsuccessfulDelete() throws Exception { + Path filePath = new Path("/some/path/to/file.txt"); + when(mockFileSystem.exists(any(Path.class))).thenReturn(false); + DeleteHDFS deleteHDFS = new TestableDeleteHDFS(kerberosProperties, mockFileSystem); + TestRunner runner = TestRunners.newTestRunner(deleteHDFS); + runner.setIncomingConnection(false); + runner.assertNotValid(); + runner.setProperty(DeleteHDFS.FILE_OR_DIRECTORY, filePath.toString()); + runner.assertValid(); + runner.run(); + runner.assertAllFlowFilesTransferred(DeleteHDFS.REL_FAILURE); + runner.assertTransferCount(DeleteHDFS.REL_FAILURE, 1); + FlowFile flowFile = runner.getFlowFilesForRelationship(DeleteHDFS.REL_FAILURE).get(0); + assertEquals(filePath.getName(), flowFile.getAttribute("filename")); + assertEquals(filePath.getParent().toString(), flowFile.getAttribute("path")); + } + + @Test + public void testGlobDelete() throws Exception { + Path glob = new Path("/data/for/2017/08/05/*"); + int fileCount = 300; + FileStatus[] fileStatuses = new FileStatus[fileCount]; + for (int i = 0; i < fileCount; i++) { + Path file = new Path("/data/for/2017/08/05/file" + i); + FileStatus fileStatus = mock(FileStatus.class); + when(fileStatus.getPath()).thenReturn(file); + fileStatuses[i] = fileStatus; + } + when(mockFileSystem.exists(any(Path.class))).thenReturn(true); + when(mockFileSystem.globStatus(any(Path.class))).thenReturn(fileStatuses); + DeleteHDFS deleteHDFS = new TestableDeleteHDFS(kerberosProperties, mockFileSystem); + TestRunner runner = TestRunners.newTestRunner(deleteHDFS); + runner.setIncomingConnection(false); + runner.assertNotValid(); + runner.setProperty(DeleteHDFS.FILE_OR_DIRECTORY, glob.toString()); + runner.assertValid(); + runner.run(); + runner.assertAllFlowFilesTransferred(DeleteHDFS.REL_SUCCESS); + runner.assertTransferCount(DeleteHDFS.REL_SUCCESS, fileCount); + List flowFiles = runner.getFlowFilesForRelationship(DeleteHDFS.REL_SUCCESS); + for (int i = 0; i < fileCount; i++) { + FlowFile flowFile = flowFiles.get(i); + assertEquals("file" + i, flowFile.getAttribute("filename")); + assertEquals("/data/for/2017/08/05", flowFile.getAttribute("path")); + } + } + + private static class TestableDeleteHDFS extends DeleteHDFS { + private KerberosProperties testKerberosProperties; + private FileSystem mockFileSystem; + + public TestableDeleteHDFS(KerberosProperties kerberosProperties, FileSystem mockFileSystem) { + this.testKerberosProperties = kerberosProperties; + this.mockFileSystem = mockFileSystem; + } + + @Override + protected KerberosProperties getKerberosProperties() { + return testKerberosProperties; + } + + @Override + protected FileSystem getFileSystem() { + return mockFileSystem; + } + } + + @Test + public void testGlobMatcher() throws Exception { + DeleteHDFS deleteHDFS = new DeleteHDFS(); + assertTrue(deleteHDFS.GLOB_MATCHER.reset("/data/for/08/09/*").find()); + assertTrue(deleteHDFS.GLOB_MATCHER.reset("/data/for/08/09/[01-04]").find()); + assertTrue(deleteHDFS.GLOB_MATCHER.reset("/data/for/0?/09/").find()); + assertFalse(deleteHDFS.GLOB_MATCHER.reset("/data/for/08/09").find()); + } +}