mirror of https://github.com/apache/nifi.git
NIFI-3600 Improve logging and relationship routing for failures in DeleteHDFS
realized that the session should be cloned here because its inside a for loop and the original flow file would be transferred but not be the latest flow file if an error occurred in the for loop @trixpan at a high level what do you think about this approach? NIFI-3600: Added unit test NIFI-3600: Removed the hdfs.error.code attribute Signed-off-by: Matt Burgess <mattyb149@apache.org> This closes #1595
This commit is contained in:
parent
f0dfcc180d
commit
5f65b2561a
|
@ -16,14 +16,26 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.nifi.processors.hadoop;
|
package org.apache.nifi.processors.hadoop;
|
||||||
|
|
||||||
import com.google.common.collect.Lists;
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.regex.Matcher;
|
||||||
|
import java.util.regex.Pattern;
|
||||||
|
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.nifi.annotation.behavior.InputRequirement;
|
import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||||
import org.apache.nifi.annotation.behavior.Restricted;
|
import org.apache.nifi.annotation.behavior.Restricted;
|
||||||
import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
|
import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
|
||||||
|
import org.apache.nifi.annotation.behavior.WritesAttribute;
|
||||||
|
import org.apache.nifi.annotation.behavior.WritesAttributes;
|
||||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||||
|
import org.apache.nifi.annotation.documentation.SeeAlso;
|
||||||
import org.apache.nifi.annotation.documentation.Tags;
|
import org.apache.nifi.annotation.documentation.Tags;
|
||||||
import org.apache.nifi.components.PropertyDescriptor;
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
import org.apache.nifi.flowfile.FlowFile;
|
import org.apache.nifi.flowfile.FlowFile;
|
||||||
|
@ -33,15 +45,8 @@ import org.apache.nifi.processor.Relationship;
|
||||||
import org.apache.nifi.processor.exception.ProcessException;
|
import org.apache.nifi.processor.exception.ProcessException;
|
||||||
import org.apache.nifi.processor.util.StandardValidators;
|
import org.apache.nifi.processor.util.StandardValidators;
|
||||||
|
|
||||||
import java.io.IOException;
|
import com.google.common.collect.Lists;
|
||||||
import java.util.ArrayList;
|
import com.google.common.collect.Maps;
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.HashSet;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Set;
|
|
||||||
import java.util.regex.Matcher;
|
|
||||||
import java.util.regex.Pattern;
|
|
||||||
import org.apache.nifi.annotation.documentation.SeeAlso;
|
|
||||||
|
|
||||||
@TriggerWhenEmpty
|
@TriggerWhenEmpty
|
||||||
@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
|
@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
|
||||||
|
@ -54,6 +59,11 @@ import org.apache.nifi.annotation.documentation.SeeAlso;
|
||||||
+ " flowfile then provided there are no detected failures it will be transferred to success otherwise it will be sent to false. If"
|
+ " flowfile then provided there are no detected failures it will be transferred to success otherwise it will be sent to false. If"
|
||||||
+ " knowledge of globbed files deleted is necessary use ListHDFS first to produce a specific list of files to delete. ")
|
+ " knowledge of globbed files deleted is necessary use ListHDFS first to produce a specific list of files to delete. ")
|
||||||
@Restricted("Provides operator the ability to delete any file that NiFi has access to in HDFS or the local filesystem.")
|
@Restricted("Provides operator the ability to delete any file that NiFi has access to in HDFS or the local filesystem.")
|
||||||
|
@WritesAttributes({
|
||||||
|
@WritesAttribute(attribute="hdfs.filename", description="HDFS file to be deleted"),
|
||||||
|
@WritesAttribute(attribute="hdfs.path", description="HDFS Path specified in the delete request"),
|
||||||
|
@WritesAttribute(attribute="hdfs.error.message", description="HDFS error message related to the hdfs.error.code")
|
||||||
|
})
|
||||||
@SeeAlso({ListHDFS.class})
|
@SeeAlso({ListHDFS.class})
|
||||||
public class DeleteHDFS extends AbstractHadoopProcessor {
|
public class DeleteHDFS extends AbstractHadoopProcessor {
|
||||||
|
|
||||||
|
@ -146,8 +156,22 @@ public class DeleteHDFS extends AbstractHadoopProcessor {
|
||||||
|
|
||||||
for (Path path : pathList) {
|
for (Path path : pathList) {
|
||||||
if (fileSystem.exists(path)) {
|
if (fileSystem.exists(path)) {
|
||||||
fileSystem.delete(path, context.getProperty(RECURSIVE).asBoolean());
|
try {
|
||||||
getLogger().debug("For flowfile {} Deleted file at path {} with name {}", new Object[]{originalFlowFile, path.getParent().toString(), path.getName()});
|
fileSystem.delete(path, context.getProperty(RECURSIVE).asBoolean());
|
||||||
|
getLogger().debug("For flowfile {} Deleted file at path {} with name {}", new Object[]{originalFlowFile, path.getParent().toString(), path.getName()});
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
// One possible scenario is that the IOException is permissions based, however it would be impractical to check every possible
|
||||||
|
// external HDFS authorization tool (Ranger, Sentry, etc). Local ACLs could be checked but the operation would be expensive.
|
||||||
|
getLogger().warn("Failed to delete file or directory", ioe);
|
||||||
|
|
||||||
|
Map<String, String> attributes = Maps.newHashMapWithExpectedSize(3);
|
||||||
|
attributes.put("hdfs.filename", path.getName());
|
||||||
|
attributes.put("hdfs.path", path.getParent().toString());
|
||||||
|
// The error message is helpful in understanding at a flowfile level what caused the IOException (which ACL is denying the operation, e.g.)
|
||||||
|
attributes.put("hdfs.error.message", ioe.getMessage());
|
||||||
|
|
||||||
|
session.transfer(session.putAllAttributes(session.clone(originalFlowFile), attributes), REL_FAILURE);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (originalFlowFile != null) {
|
if (originalFlowFile != null) {
|
||||||
|
|
|
@ -16,6 +16,7 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.nifi.processors.hadoop;
|
package org.apache.nifi.processors.hadoop;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.mockito.Mockito.any;
|
import static org.mockito.Mockito.any;
|
||||||
|
@ -30,6 +31,7 @@ import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.nifi.hadoop.KerberosProperties;
|
import org.apache.nifi.hadoop.KerberosProperties;
|
||||||
|
import org.apache.nifi.util.MockFlowFile;
|
||||||
import org.apache.nifi.util.NiFiProperties;
|
import org.apache.nifi.util.NiFiProperties;
|
||||||
import org.apache.nifi.util.TestRunner;
|
import org.apache.nifi.util.TestRunner;
|
||||||
import org.apache.nifi.util.TestRunners;
|
import org.apache.nifi.util.TestRunners;
|
||||||
|
@ -96,6 +98,25 @@ public class TestDeleteHDFS {
|
||||||
runner.assertTransferCount(DeleteHDFS.REL_FAILURE, 1);
|
runner.assertTransferCount(DeleteHDFS.REL_FAILURE, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPermissionIOException() throws Exception {
|
||||||
|
Path filePath = new Path("/some/path/to/file.txt");
|
||||||
|
when(mockFileSystem.exists(any(Path.class))).thenReturn(true);
|
||||||
|
when(mockFileSystem.delete(any(Path.class), any(Boolean.class))).thenThrow(new IOException("Permissions Error"));
|
||||||
|
DeleteHDFS deleteHDFS = new TestableDeleteHDFS(kerberosProperties, mockFileSystem);
|
||||||
|
TestRunner runner = TestRunners.newTestRunner(deleteHDFS);
|
||||||
|
runner.setProperty(DeleteHDFS.FILE_OR_DIRECTORY, "${hdfs.file}");
|
||||||
|
Map<String, String> attributes = Maps.newHashMap();
|
||||||
|
attributes.put("hdfs.file", filePath.toString());
|
||||||
|
runner.enqueue("foo", attributes);
|
||||||
|
runner.run();
|
||||||
|
runner.assertTransferCount(DeleteHDFS.REL_FAILURE, 1);
|
||||||
|
MockFlowFile flowFile = runner.getFlowFilesForRelationship(DeleteHDFS.REL_FAILURE).get(0);
|
||||||
|
assertEquals("file.txt", flowFile.getAttribute("hdfs.filename"));
|
||||||
|
assertEquals("/some/path/to", flowFile.getAttribute("hdfs.path"));
|
||||||
|
assertEquals("Permissions Error", flowFile.getAttribute("hdfs.error.message"));
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testNoFlowFilesWithIncomingConnection() throws Exception {
|
public void testNoFlowFilesWithIncomingConnection() throws Exception {
|
||||||
Path filePath = new Path("${hdfs.file}");
|
Path filePath = new Path("${hdfs.file}");
|
||||||
|
|
Loading…
Reference in New Issue