HDFS-10549. Correctly revoke file leases when closing files. Contributed by Yiqun Lin.
(cherry picked from commit 85aacaadb5a3f8c78b191867c0bde09b3c4b3c3c)
Conflicts:
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
(cherry picked from commit a20b943cf9
)
This commit is contained in:
parent
d65024eddc
commit
b89d79ca1d
|
@ -471,7 +471,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Stop renewal of lease for the file. */
|
/** Stop renewal of lease for the file. */
|
||||||
void endFileLease(final long inodeId) throws IOException {
|
void endFileLease(final long inodeId) {
|
||||||
getLeaseRenewer().closeFile(inodeId, this);
|
getLeaseRenewer().closeFile(inodeId, this);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -55,6 +55,7 @@ import org.apache.hadoop.hdfs.server.namenode.RetryStartFileException;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
|
import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
|
||||||
import org.apache.hadoop.hdfs.util.ByteArrayManager;
|
import org.apache.hadoop.hdfs.util.ByteArrayManager;
|
||||||
import org.apache.hadoop.io.EnumSetWritable;
|
import org.apache.hadoop.io.EnumSetWritable;
|
||||||
|
import org.apache.hadoop.io.MultipleIOException;
|
||||||
import org.apache.hadoop.ipc.RemoteException;
|
import org.apache.hadoop.ipc.RemoteException;
|
||||||
import org.apache.hadoop.security.AccessControlException;
|
import org.apache.hadoop.security.AccessControlException;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
|
@ -709,6 +710,7 @@ public class DFSOutputStream extends FSOutputSummer
|
||||||
* resources associated with this stream.
|
* resources associated with this stream.
|
||||||
*/
|
*/
|
||||||
void abort() throws IOException {
|
void abort() throws IOException {
|
||||||
|
final MultipleIOException.Builder b = new MultipleIOException.Builder();
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
if (isClosed()) {
|
if (isClosed()) {
|
||||||
return;
|
return;
|
||||||
|
@ -717,9 +719,19 @@ public class DFSOutputStream extends FSOutputSummer
|
||||||
new IOException("Lease timeout of "
|
new IOException("Lease timeout of "
|
||||||
+ (dfsClient.getConf().getHdfsTimeout() / 1000)
|
+ (dfsClient.getConf().getHdfsTimeout() / 1000)
|
||||||
+ " seconds expired."));
|
+ " seconds expired."));
|
||||||
closeThreads(true);
|
|
||||||
|
try {
|
||||||
|
closeThreads(true);
|
||||||
|
} catch (IOException e) {
|
||||||
|
b.add(e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
dfsClient.endFileLease(fileId);
|
dfsClient.endFileLease(fileId);
|
||||||
|
final IOException ioe = b.build();
|
||||||
|
if (ioe != null) {
|
||||||
|
throw ioe;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
boolean isClosed() {
|
boolean isClosed() {
|
||||||
|
@ -752,13 +764,21 @@ public class DFSOutputStream extends FSOutputSummer
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public void close() throws IOException {
|
public void close() throws IOException {
|
||||||
|
final MultipleIOException.Builder b = new MultipleIOException.Builder();
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
try (TraceScope ignored = dfsClient.newPathTraceScope(
|
try (TraceScope ignored = dfsClient.newPathTraceScope(
|
||||||
"DFSOutputStream#close", src)) {
|
"DFSOutputStream#close", src)) {
|
||||||
closeImpl();
|
closeImpl();
|
||||||
|
} catch (IOException e) {
|
||||||
|
b.add(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
dfsClient.endFileLease(fileId);
|
dfsClient.endFileLease(fileId);
|
||||||
|
final IOException ioe = b.build();
|
||||||
|
if (ioe != null) {
|
||||||
|
throw ioe;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
protected synchronized void closeImpl() throws IOException {
|
protected synchronized void closeImpl() throws IOException {
|
||||||
|
|
|
@ -105,7 +105,6 @@ import org.mockito.stubbing.Answer;
|
||||||
import com.google.common.base.Supplier;
|
import com.google.common.base.Supplier;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
import org.mockito.internal.util.reflection.Whitebox;
|
import org.mockito.internal.util.reflection.Whitebox;
|
||||||
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
@ -1382,4 +1381,37 @@ public class TestDistributedFileSystem {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDFSCloseFilesBeingWritten() throws Exception {
|
||||||
|
Configuration conf = getTestConfiguration();
|
||||||
|
MiniDFSCluster cluster = null;
|
||||||
|
try {
|
||||||
|
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
|
||||||
|
DistributedFileSystem fileSys = cluster.getFileSystem();
|
||||||
|
|
||||||
|
// Create one file then delete it to trigger the FileNotFoundException
|
||||||
|
// when closing the file.
|
||||||
|
fileSys.create(new Path("/test/dfsclose/file-0"));
|
||||||
|
fileSys.delete(new Path("/test/dfsclose/file-0"), true);
|
||||||
|
|
||||||
|
DFSClient dfsClient = fileSys.getClient();
|
||||||
|
// Construct a new dfsClient to get the same LeaseRenewer instance,
|
||||||
|
// to avoid the original client being added to the leaseRenewer again.
|
||||||
|
DFSClient newDfsClient =
|
||||||
|
new DFSClient(cluster.getFileSystem(0).getUri(), conf);
|
||||||
|
LeaseRenewer leaseRenewer = newDfsClient.getLeaseRenewer();
|
||||||
|
|
||||||
|
dfsClient.closeAllFilesBeingWritten(false);
|
||||||
|
// Remove new dfsClient in leaseRenewer
|
||||||
|
leaseRenewer.closeClient(newDfsClient);
|
||||||
|
|
||||||
|
// The list of clients corresponding to this renewer should be empty
|
||||||
|
assertEquals(true, leaseRenewer.isEmpty());
|
||||||
|
assertEquals(true, dfsClient.isFilesBeingWrittenEmpty());
|
||||||
|
} finally {
|
||||||
|
if (cluster != null) {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue