diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index 24fc3643b2a..4abf234de5a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -471,7 +471,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, } /** Stop renewal of lease for the file. */ - void endFileLease(final long inodeId) throws IOException { + void endFileLease(final long inodeId) { getLeaseRenewer().closeFile(inodeId, this); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java index 472c41f4c33..8439dc8ff0e 100755 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -55,6 +55,7 @@ import org.apache.hadoop.hdfs.server.namenode.RetryStartFileException; import org.apache.hadoop.hdfs.server.namenode.SafeModeException; import org.apache.hadoop.hdfs.util.ByteArrayManager; import org.apache.hadoop.io.EnumSetWritable; +import org.apache.hadoop.io.MultipleIOException; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.token.Token; @@ -709,6 +710,7 @@ public class DFSOutputStream extends FSOutputSummer * resources associated with this stream. */ void abort() throws IOException { + final MultipleIOException.Builder b = new MultipleIOException.Builder(); synchronized (this) { if (isClosed()) { return; @@ -717,9 +719,19 @@ public class DFSOutputStream extends FSOutputSummer new IOException("Lease timeout of " + (dfsClient.getConf().getHdfsTimeout() / 1000) + " seconds expired.")); - closeThreads(true); + + try { + closeThreads(true); + } catch (IOException e) { + b.add(e); + } } + dfsClient.endFileLease(fileId); + final IOException ioe = b.build(); + if (ioe != null) { + throw ioe; + } } boolean isClosed() { @@ -752,13 +764,21 @@ public class DFSOutputStream extends FSOutputSummer */ @Override public void close() throws IOException { + final MultipleIOException.Builder b = new MultipleIOException.Builder(); synchronized (this) { try (TraceScope ignored = dfsClient.newPathTraceScope( "DFSOutputStream#close", src)) { closeImpl(); + } catch (IOException e) { + b.add(e); } } + dfsClient.endFileLease(fileId); + final IOException ioe = b.build(); + if (ioe != null) { + throw ioe; + } } protected synchronized void closeImpl() throws IOException { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java index 6d3525f23c3..65d6cd9fe2a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java @@ -105,7 +105,6 @@ import org.mockito.stubbing.Answer; import com.google.common.base.Supplier; import com.google.common.collect.Lists; import org.mockito.internal.util.reflection.Whitebox; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -1382,4 +1381,37 @@ public class TestDistributedFileSystem { } } + @Test + public void testDFSCloseFilesBeingWritten() throws Exception { + Configuration conf = getTestConfiguration(); + MiniDFSCluster cluster = null; + try { + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); + DistributedFileSystem fileSys = cluster.getFileSystem(); + + // Create one file then delete it to trigger the FileNotFoundException + // when closing the file. + fileSys.create(new Path("/test/dfsclose/file-0")); + fileSys.delete(new Path("/test/dfsclose/file-0"), true); + + DFSClient dfsClient = fileSys.getClient(); + // Construct a new dfsClient to get the same LeaseRenewer instance, + // to avoid the original client being added to the leaseRenewer again. + DFSClient newDfsClient = + new DFSClient(cluster.getFileSystem(0).getUri(), conf); + LeaseRenewer leaseRenewer = newDfsClient.getLeaseRenewer(); + + dfsClient.closeAllFilesBeingWritten(false); + // Remove new dfsClient in leaseRenewer + leaseRenewer.closeClient(newDfsClient); + + // The list of clients corresponding to this renewer should be empty + assertEquals(true, leaseRenewer.isEmpty()); + assertEquals(true, dfsClient.isFilesBeingWrittenEmpty()); + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } }