From 76e8c51e11ce42fc99d48722507e101a05bec868 Mon Sep 17 00:00:00 2001 From: Matthew Burgess Date: Thu, 30 Jan 2020 10:50:29 -0500 Subject: [PATCH] NIFI-7073: This closes #4025. Route to failure when error on PutHDFS file system close Signed-off-by: Joe Witt --- .../nifi/processors/hadoop/PutHDFS.java | 6 +-- .../nifi/processors/hadoop/PutHDFSTest.java | 48 ++++++++++++++++++- 2 files changed, 49 insertions(+), 5 deletions(-) diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java index 37c306c19f..b6c43282c6 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java @@ -23,7 +23,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsCreateModes; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.io.compress.CompressionCodec; -import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.security.UserGroupInformation; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; @@ -353,7 +352,7 @@ public class PutHDFS extends AbstractHadoopProcessor { if (fos != null) { fos.close(); } - } catch (RemoteException re) { + } catch (Throwable t) { // when talking to remote HDFS clusters, we don't notice problems until fos.close() if (createdFile != null) { try { @@ -361,8 +360,7 @@ public class PutHDFS extends AbstractHadoopProcessor { } catch (Throwable ignore) { } } - throw re; - } catch (Throwable ignore) { + throw t; } fos = null; } diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java index 9c51f34c72..94130180e4 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java @@ -432,6 +432,33 @@ public class PutHDFSTest { fileSystem.getFileStatus(new Path("target/test-classes/randombytes-1")).getPermission()); } + @Test + public void testPutFileWithCloseException() throws IOException { + mockFileSystem = new MockFileSystem(true); + String dirName = "target/testPutFileCloseException"; + File file = new File(dirName); + file.mkdirs(); + Path p = new Path(dirName).makeQualified(mockFileSystem.getUri(), mockFileSystem.getWorkingDirectory()); + + TestRunner runner = TestRunners.newTestRunner(new TestablePutHDFS(kerberosProperties, mockFileSystem)); + runner.setProperty(PutHDFS.DIRECTORY, dirName); + runner.setProperty(PutHDFS.CONFLICT_RESOLUTION, "replace"); + + try (FileInputStream fis = new FileInputStream("src/test/resources/testdata/randombytes-1")) { + Map attributes = new HashMap<>(); + attributes.put(CoreAttributes.FILENAME.key(), "randombytes-1"); + runner.enqueue(fis, attributes); + runner.run(); + } + + List failedFlowFiles = runner + .getFlowFilesForRelationship(PutHDFS.REL_FAILURE); + assertFalse(failedFlowFiles.isEmpty()); + assertTrue(failedFlowFiles.get(0).isPenalized()); + + mockFileSystem.delete(p, true); + } + private class TestablePutHDFS extends PutHDFS { private KerberosProperties testKerberosProperties; @@ -461,6 +488,15 @@ public class PutHDFSTest { private class MockFileSystem extends FileSystem { private final Map pathToStatus = new HashMap<>(); + private final boolean failOnClose; + + public MockFileSystem() { + failOnClose = false; + } + + public MockFileSystem(boolean failOnClose) { + this.failOnClose = failOnClose; + } @Override public URI getUri() { @@ -476,7 +512,17 @@ public class PutHDFSTest { public FSDataOutputStream create(final Path f, final FsPermission permission, final boolean overwrite, final int bufferSize, final short replication, final long blockSize, final Progressable progress) { pathToStatus.put(f, newFile(f, permission)); - return new FSDataOutputStream(new ByteArrayOutputStream(), new Statistics("")); + if(failOnClose) { + return new FSDataOutputStream(new ByteArrayOutputStream(), new Statistics("")) { + @Override + public void close() throws IOException { + super.close(); + throw new IOException("Fail on close"); + } + }; + } else { + return new FSDataOutputStream(new ByteArrayOutputStream(), new Statistics("")); + } } @Override