HDFS-10549. Correctly revoke file leases when closing files. Contributed by Yiqun Lin.

(cherry picked from commit 85aacaadb5a3f8c78b191867c0bde09b3c4b3c3c)

Conflicts:
	hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
This commit is contained in:
Xiao Chen 2016-08-17 15:22:42 -07:00
parent ac8c3ae784
commit a20b943cf9
3 changed files with 55 additions and 3 deletions

View File

@ -471,7 +471,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
} }
/** Stop renewal of lease for the file. */ /** Stop renewal of lease for the file. */
void endFileLease(final long inodeId) throws IOException { void endFileLease(final long inodeId) {
getLeaseRenewer().closeFile(inodeId, this); getLeaseRenewer().closeFile(inodeId, this);
} }

View File

@ -55,6 +55,7 @@ import org.apache.hadoop.hdfs.server.namenode.RetryStartFileException;
import org.apache.hadoop.hdfs.server.namenode.SafeModeException; import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
import org.apache.hadoop.hdfs.util.ByteArrayManager; import org.apache.hadoop.hdfs.util.ByteArrayManager;
import org.apache.hadoop.io.EnumSetWritable; import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.io.MultipleIOException;
import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
@ -709,6 +710,7 @@ public class DFSOutputStream extends FSOutputSummer
* resources associated with this stream. * resources associated with this stream.
*/ */
void abort() throws IOException { void abort() throws IOException {
final MultipleIOException.Builder b = new MultipleIOException.Builder();
synchronized (this) { synchronized (this) {
if (isClosed()) { if (isClosed()) {
return; return;
@ -717,9 +719,19 @@ public class DFSOutputStream extends FSOutputSummer
new IOException("Lease timeout of " new IOException("Lease timeout of "
+ (dfsClient.getConf().getHdfsTimeout() / 1000) + (dfsClient.getConf().getHdfsTimeout() / 1000)
+ " seconds expired.")); + " seconds expired."));
closeThreads(true);
try {
closeThreads(true);
} catch (IOException e) {
b.add(e);
}
} }
dfsClient.endFileLease(fileId); dfsClient.endFileLease(fileId);
final IOException ioe = b.build();
if (ioe != null) {
throw ioe;
}
} }
boolean isClosed() { boolean isClosed() {
@ -752,13 +764,21 @@ public class DFSOutputStream extends FSOutputSummer
*/ */
@Override @Override
public void close() throws IOException { public void close() throws IOException {
final MultipleIOException.Builder b = new MultipleIOException.Builder();
synchronized (this) { synchronized (this) {
try (TraceScope ignored = dfsClient.newPathTraceScope( try (TraceScope ignored = dfsClient.newPathTraceScope(
"DFSOutputStream#close", src)) { "DFSOutputStream#close", src)) {
closeImpl(); closeImpl();
} catch (IOException e) {
b.add(e);
} }
} }
dfsClient.endFileLease(fileId); dfsClient.endFileLease(fileId);
final IOException ioe = b.build();
if (ioe != null) {
throw ioe;
}
} }
protected synchronized void closeImpl() throws IOException { protected synchronized void closeImpl() throws IOException {

View File

@ -111,7 +111,6 @@ import org.mockito.stubbing.Answer;
import com.google.common.base.Supplier; import com.google.common.base.Supplier;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import org.mockito.internal.util.reflection.Whitebox; import org.mockito.internal.util.reflection.Whitebox;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -1487,4 +1486,37 @@ public class TestDistributedFileSystem {
} }
} }
@Test
public void testDFSCloseFilesBeingWritten() throws Exception {
Configuration conf = getTestConfiguration();
MiniDFSCluster cluster = null;
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
DistributedFileSystem fileSys = cluster.getFileSystem();
// Create one file then delete it to trigger the FileNotFoundException
// when closing the file.
fileSys.create(new Path("/test/dfsclose/file-0"));
fileSys.delete(new Path("/test/dfsclose/file-0"), true);
DFSClient dfsClient = fileSys.getClient();
// Construct a new dfsClient to get the same LeaseRenewer instance,
// to avoid the original client being added to the leaseRenewer again.
DFSClient newDfsClient =
new DFSClient(cluster.getFileSystem(0).getUri(), conf);
LeaseRenewer leaseRenewer = newDfsClient.getLeaseRenewer();
dfsClient.closeAllFilesBeingWritten(false);
// Remove new dfsClient in leaseRenewer
leaseRenewer.closeClient(newDfsClient);
// The list of clients corresponding to this renewer should be empty
assertEquals(true, leaseRenewer.isEmpty());
assertEquals(true, dfsClient.isFilesBeingWrittenEmpty());
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
} }