NIFI-7073: This closes #4025. Route to failure when error on PutHDFS file system close

Signed-off-by: Joe Witt <joewitt@apache.org>
This commit is contained in:
Matthew Burgess 2020-01-30 10:50:29 -05:00 committed by Joe Witt
parent 850869c6d2
commit 76e8c51e11
No known key found for this signature in database
GPG Key ID: 9093BF854F811A1A
2 changed files with 49 additions and 5 deletions

View File

@ -23,7 +23,6 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsCreateModes; import org.apache.hadoop.fs.permission.FsCreateModes;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
@ -353,7 +352,7 @@ public class PutHDFS extends AbstractHadoopProcessor {
if (fos != null) { if (fos != null) {
fos.close(); fos.close();
} }
} catch (RemoteException re) { } catch (Throwable t) {
// when talking to remote HDFS clusters, we don't notice problems until fos.close() // when talking to remote HDFS clusters, we don't notice problems until fos.close()
if (createdFile != null) { if (createdFile != null) {
try { try {
@ -361,8 +360,7 @@ public class PutHDFS extends AbstractHadoopProcessor {
} catch (Throwable ignore) { } catch (Throwable ignore) {
} }
} }
throw re; throw t;
} catch (Throwable ignore) {
} }
fos = null; fos = null;
} }

View File

@ -432,6 +432,33 @@ public class PutHDFSTest {
fileSystem.getFileStatus(new Path("target/test-classes/randombytes-1")).getPermission()); fileSystem.getFileStatus(new Path("target/test-classes/randombytes-1")).getPermission());
} }
@Test
public void testPutFileWithCloseException() throws IOException {
mockFileSystem = new MockFileSystem(true);
String dirName = "target/testPutFileCloseException";
File file = new File(dirName);
file.mkdirs();
Path p = new Path(dirName).makeQualified(mockFileSystem.getUri(), mockFileSystem.getWorkingDirectory());
TestRunner runner = TestRunners.newTestRunner(new TestablePutHDFS(kerberosProperties, mockFileSystem));
runner.setProperty(PutHDFS.DIRECTORY, dirName);
runner.setProperty(PutHDFS.CONFLICT_RESOLUTION, "replace");
try (FileInputStream fis = new FileInputStream("src/test/resources/testdata/randombytes-1")) {
Map<String, String> attributes = new HashMap<>();
attributes.put(CoreAttributes.FILENAME.key(), "randombytes-1");
runner.enqueue(fis, attributes);
runner.run();
}
List<MockFlowFile> failedFlowFiles = runner
.getFlowFilesForRelationship(PutHDFS.REL_FAILURE);
assertFalse(failedFlowFiles.isEmpty());
assertTrue(failedFlowFiles.get(0).isPenalized());
mockFileSystem.delete(p, true);
}
private class TestablePutHDFS extends PutHDFS { private class TestablePutHDFS extends PutHDFS {
private KerberosProperties testKerberosProperties; private KerberosProperties testKerberosProperties;
@ -461,6 +488,15 @@ public class PutHDFSTest {
private class MockFileSystem extends FileSystem { private class MockFileSystem extends FileSystem {
private final Map<Path, FileStatus> pathToStatus = new HashMap<>(); private final Map<Path, FileStatus> pathToStatus = new HashMap<>();
private final boolean failOnClose;
public MockFileSystem() {
failOnClose = false;
}
public MockFileSystem(boolean failOnClose) {
this.failOnClose = failOnClose;
}
@Override @Override
public URI getUri() { public URI getUri() {
@ -476,8 +512,18 @@ public class PutHDFSTest {
public FSDataOutputStream create(final Path f, final FsPermission permission, final boolean overwrite, final int bufferSize, final short replication, public FSDataOutputStream create(final Path f, final FsPermission permission, final boolean overwrite, final int bufferSize, final short replication,
final long blockSize, final Progressable progress) { final long blockSize, final Progressable progress) {
pathToStatus.put(f, newFile(f, permission)); pathToStatus.put(f, newFile(f, permission));
if(failOnClose) {
return new FSDataOutputStream(new ByteArrayOutputStream(), new Statistics("")) {
@Override
public void close() throws IOException {
super.close();
throw new IOException("Fail on close");
}
};
} else {
return new FSDataOutputStream(new ByteArrayOutputStream(), new Statistics("")); return new FSDataOutputStream(new ByteArrayOutputStream(), new Statistics(""));
} }
}
@Override @Override
public FSDataOutputStream append(final Path f, final int bufferSize, final Progressable progress) { public FSDataOutputStream append(final Path f, final int bufferSize, final Progressable progress) {