NIFI-3204 This closes #1561. fix handling deleting a path with a wildcard when the processor is invoqued via an incoming flowfile

applied Joseph Witts patch from NIFI-3204 JIRA

Signed-off-by: joewitt <joewitt@apache.org>
This commit is contained in:
Francois Prunier 2017-03-03 11:41:42 +01:00 committed by joewitt
parent 4bfb905f37
commit 61c799d88b
2 changed files with 56 additions and 58 deletions

View File

@ -17,7 +17,6 @@
package org.apache.nifi.processors.hadoop; package org.apache.nifi.processors.hadoop;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
@ -39,33 +38,38 @@ import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.regex.Matcher; import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import org.apache.nifi.annotation.documentation.SeeAlso;
@TriggerWhenEmpty @TriggerWhenEmpty
@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED) @InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
@Tags({ "hadoop", "HDFS", "delete", "remove", "filesystem", "restricted" }) @Tags({"hadoop", "HDFS", "delete", "remove", "filesystem", "restricted"})
@CapabilityDescription("Deletes a file from HDFS. The file can be provided as an attribute from an incoming FlowFile, " @CapabilityDescription("Deletes one or more files or directories from HDFS. The path can be provided as an attribute from an incoming FlowFile, "
+ "or a statically set file that is periodically removed. If this processor has an incoming connection, it" + "or a statically set path that is periodically removed. If this processor has an incoming connection, it"
+ "will ignore running on a periodic basis and instead rely on incoming FlowFiles to trigger a delete. " + "will ignore running on a periodic basis and instead rely on incoming FlowFiles to trigger a delete. "
+ "Optionally, you may specify use a wildcard character to match multiple files or directories.") + "Note that you may use a wildcard character to match multiple files or directories. If there are"
+ " no incoming connections no flowfiles will be transfered to any output relationships. If there is an incoming"
+ " flowfile then provided there are no detected failures it will be transferred to success otherwise it will be sent to false. If"
+ " knowledge of globbed files deleted is necessary use ListHDFS first to produce a specific list of files to delete. ")
@Restricted("Provides operator the ability to delete any file that NiFi has access to in HDFS or the local filesystem.") @Restricted("Provides operator the ability to delete any file that NiFi has access to in HDFS or the local filesystem.")
@SeeAlso({ListHDFS.class})
public class DeleteHDFS extends AbstractHadoopProcessor { public class DeleteHDFS extends AbstractHadoopProcessor {
public static final Relationship REL_SUCCESS = new Relationship.Builder() public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success") .name("success")
.description("FlowFiles will be routed here if the delete command was successful") .description("When an incoming flowfile is used then if there are no errors invoking delete the flowfile will route here.")
.build(); .build();
public static final Relationship REL_FAILURE = new Relationship.Builder() public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure") .name("failure")
.description("FlowFiles will be routed here if the delete command was unsuccessful") .description("When an incoming flowfile is used and there is a failure while deleting then the flowfile will route here.")
.build(); .build();
public static final PropertyDescriptor FILE_OR_DIRECTORY = new PropertyDescriptor.Builder() public static final PropertyDescriptor FILE_OR_DIRECTORY = new PropertyDescriptor.Builder()
.name("file_or_directory") .name("file_or_directory")
.displayName("File or Directory") .displayName("Path")
.description("The HDFS file or directory to delete. A wildcard expression may be used to only delete certain files") .description("The HDFS file or directory to delete. A wildcard expression may be used to only delete certain files")
.required(true) .required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
@ -109,20 +113,20 @@ public class DeleteHDFS extends AbstractHadoopProcessor {
@Override @Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
String fileOrDirectoryName = null; final FlowFile originalFlowFile = session.get();
FlowFile flowFile = session.get();
// If this processor has an incoming connection, then do not run unless a // If this processor has an incoming connection, then do not run unless a
// FlowFile is actually sent through // FlowFile is actually sent through
if (flowFile == null && context.hasIncomingConnection()) { if (originalFlowFile == null && context.hasIncomingConnection()) {
context.yield(); context.yield();
return; return;
} }
if (flowFile != null) { final String fileOrDirectoryName;
fileOrDirectoryName = context.getProperty(FILE_OR_DIRECTORY).evaluateAttributeExpressions(flowFile).getValue(); if (originalFlowFile == null) {
} else {
fileOrDirectoryName = context.getProperty(FILE_OR_DIRECTORY).evaluateAttributeExpressions().getValue(); fileOrDirectoryName = context.getProperty(FILE_OR_DIRECTORY).evaluateAttributeExpressions().getValue();
} else {
fileOrDirectoryName = context.getProperty(FILE_OR_DIRECTORY).evaluateAttributeExpressions(originalFlowFile).getValue();
} }
final FileSystem fileSystem = getFileSystem(); final FileSystem fileSystem = getFileSystem();
@ -140,30 +144,21 @@ public class DeleteHDFS extends AbstractHadoopProcessor {
pathList.add(new Path(fileOrDirectoryName)); pathList.add(new Path(fileOrDirectoryName));
} }
Map<String, String> attributes = Maps.newHashMapWithExpectedSize(2);
for (Path path : pathList) { for (Path path : pathList) {
attributes.put("filename", path.getName());
attributes.put("path", path.getParent().toString());
if (fileSystem.exists(path)) { if (fileSystem.exists(path)) {
fileSystem.delete(path, context.getProperty(RECURSIVE).asBoolean()); fileSystem.delete(path, context.getProperty(RECURSIVE).asBoolean());
if (!context.hasIncomingConnection()) { getLogger().debug("For flowfile {} Deleted file at path {} with name {}", new Object[]{originalFlowFile, path.getParent().toString(), path.getName()});
flowFile = session.create();
}
session.transfer(session.putAllAttributes(flowFile, attributes), REL_SUCCESS);
} else {
getLogger().warn("File (" + path + ") does not exist");
if (!context.hasIncomingConnection()) {
flowFile = session.create();
}
session.transfer(session.putAllAttributes(flowFile, attributes), REL_FAILURE);
} }
} }
if (originalFlowFile != null) {
session.transfer(originalFlowFile, DeleteHDFS.REL_SUCCESS);
}
} catch (IOException e) { } catch (IOException e) {
getLogger().warn("Error processing delete for file or directory", e); if (originalFlowFile != null) {
if (flowFile != null) { getLogger().error("Error processing delete for flowfile {} due to {}", new Object[]{originalFlowFile, e.getMessage()}, e);
session.rollback(true); session.transfer(originalFlowFile, DeleteHDFS.REL_FAILURE);
} }
} }
}
}
} }

View File

@ -16,7 +16,6 @@
*/ */
package org.apache.nifi.processors.hadoop; package org.apache.nifi.processors.hadoop;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.any; import static org.mockito.Mockito.any;
@ -25,15 +24,12 @@ import static org.mockito.Mockito.when;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.List;
import java.util.Map; import java.util.Map;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.hadoop.KerberosProperties; import org.apache.nifi.hadoop.KerberosProperties;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners; import org.apache.nifi.util.TestRunners;
@ -55,6 +51,7 @@ public class TestDeleteHDFS {
mockFileSystem = mock(FileSystem.class); mockFileSystem = mock(FileSystem.class);
} }
//Tests the case where a file is found and deleted but there was no incoming connection
@Test @Test
public void testSuccessfulDelete() throws Exception { public void testSuccessfulDelete() throws Exception {
Path filePath = new Path("/some/path/to/file.txt"); Path filePath = new Path("/some/path/to/file.txt");
@ -66,11 +63,8 @@ public class TestDeleteHDFS {
runner.setProperty(DeleteHDFS.FILE_OR_DIRECTORY, filePath.toString()); runner.setProperty(DeleteHDFS.FILE_OR_DIRECTORY, filePath.toString());
runner.assertValid(); runner.assertValid();
runner.run(); runner.run();
runner.assertAllFlowFilesTransferred(DeleteHDFS.REL_SUCCESS); runner.assertTransferCount(DeleteHDFS.REL_SUCCESS, 0);
runner.assertTransferCount(DeleteHDFS.REL_SUCCESS, 1); runner.assertTransferCount(DeleteHDFS.REL_FAILURE, 0);
FlowFile flowFile = runner.getFlowFilesForRelationship(DeleteHDFS.REL_SUCCESS).get(0);
assertEquals(filePath.getName(), flowFile.getAttribute("filename"));
assertEquals(filePath.getParent().toString(), flowFile.getAttribute("path"));
} }
@Test @Test
@ -86,9 +80,6 @@ public class TestDeleteHDFS {
runner.run(); runner.run();
runner.assertAllFlowFilesTransferred(DeleteHDFS.REL_SUCCESS); runner.assertAllFlowFilesTransferred(DeleteHDFS.REL_SUCCESS);
runner.assertTransferCount(DeleteHDFS.REL_SUCCESS, 1); runner.assertTransferCount(DeleteHDFS.REL_SUCCESS, 1);
FlowFile flowFile = runner.getFlowFilesForRelationship(DeleteHDFS.REL_SUCCESS).get(0);
assertEquals(filePath.getName(), flowFile.getAttribute("filename"));
assertEquals(filePath.getParent().toString(), flowFile.getAttribute("path"));
} }
@Test @Test
@ -102,9 +93,7 @@ public class TestDeleteHDFS {
attributes.put("hdfs.file", filePath.toString()); attributes.put("hdfs.file", filePath.toString());
runner.enqueue("foo", attributes); runner.enqueue("foo", attributes);
runner.run(); runner.run();
runner.assertQueueNotEmpty(); runner.assertTransferCount(DeleteHDFS.REL_FAILURE, 1);
runner.assertPenalizeCount(1);
assertEquals(1, runner.getQueueSize().getObjectCount());
} }
@Test @Test
@ -131,11 +120,7 @@ public class TestDeleteHDFS {
runner.setProperty(DeleteHDFS.FILE_OR_DIRECTORY, filePath.toString()); runner.setProperty(DeleteHDFS.FILE_OR_DIRECTORY, filePath.toString());
runner.assertValid(); runner.assertValid();
runner.run(); runner.run();
runner.assertAllFlowFilesTransferred(DeleteHDFS.REL_FAILURE); runner.assertTransferCount(DeleteHDFS.REL_FAILURE, 0);
runner.assertTransferCount(DeleteHDFS.REL_FAILURE, 1);
FlowFile flowFile = runner.getFlowFilesForRelationship(DeleteHDFS.REL_FAILURE).get(0);
assertEquals(filePath.getName(), flowFile.getAttribute("filename"));
assertEquals(filePath.getParent().toString(), flowFile.getAttribute("path"));
} }
@Test @Test
@ -158,14 +143,32 @@ public class TestDeleteHDFS {
runner.setProperty(DeleteHDFS.FILE_OR_DIRECTORY, glob.toString()); runner.setProperty(DeleteHDFS.FILE_OR_DIRECTORY, glob.toString());
runner.assertValid(); runner.assertValid();
runner.run(); runner.run();
runner.assertAllFlowFilesTransferred(DeleteHDFS.REL_SUCCESS); runner.assertTransferCount(DeleteHDFS.REL_SUCCESS, 0);
runner.assertTransferCount(DeleteHDFS.REL_SUCCESS, fileCount); }
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(DeleteHDFS.REL_SUCCESS);
@Test
public void testGlobDeleteFromIncomingFlowFile() throws Exception {
Path glob = new Path("/data/for/2017/08/05/*");
int fileCount = 300;
FileStatus[] fileStatuses = new FileStatus[fileCount];
for (int i = 0; i < fileCount; i++) { for (int i = 0; i < fileCount; i++) {
FlowFile flowFile = flowFiles.get(i); Path file = new Path("/data/for/2017/08/05/file" + i);
assertEquals("file" + i, flowFile.getAttribute("filename")); FileStatus fileStatus = mock(FileStatus.class);
assertEquals("/data/for/2017/08/05", flowFile.getAttribute("path")); when(fileStatus.getPath()).thenReturn(file);
fileStatuses[i] = fileStatus;
} }
when(mockFileSystem.exists(any(Path.class))).thenReturn(true);
when(mockFileSystem.globStatus(any(Path.class))).thenReturn(fileStatuses);
DeleteHDFS deleteHDFS = new TestableDeleteHDFS(kerberosProperties, mockFileSystem);
TestRunner runner = TestRunners.newTestRunner(deleteHDFS);
runner.setIncomingConnection(true);
Map<String, String> attributes = Maps.newHashMap();
runner.enqueue("foo", attributes);
runner.setProperty(DeleteHDFS.FILE_OR_DIRECTORY, glob.toString());
runner.assertValid();
runner.run();
runner.assertAllFlowFilesTransferred(DeleteHDFS.REL_SUCCESS);
runner.assertTransferCount(DeleteHDFS.REL_SUCCESS, 1);
} }
private static class TestableDeleteHDFS extends DeleteHDFS { private static class TestableDeleteHDFS extends DeleteHDFS {