HDFS-15684. EC: Call recoverLease on DFSStripedOutputStream close exception. Contributed by Hongbing Wang.

Reviewed-by: He Xiaoqiao <hexiaoqiao@apache.org>
Reviewed-by: Hui Fei <ferhui@apache.org>
This commit is contained in:
He Xiaoqiao 2020-11-23 11:26:52 +08:00
parent d73029463c
commit 641d8856d2
No known key found for this signature in database
GPG Key ID: A80CC124E9A0FA63
3 changed files with 86 additions and 1 deletions

View File

@ -923,7 +923,7 @@ public class DFSOutputStream extends FSOutputSummer
* If recoverLeaseOnCloseException is true and an exception occurs when
* closing a file, recover lease.
*/
private void recoverLease(boolean recoverLeaseOnCloseException) {
protected void recoverLease(boolean recoverLeaseOnCloseException) {
if (recoverLeaseOnCloseException) {
try {
dfsClient.endFileLease(fileId);

View File

@ -73,6 +73,8 @@ import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Write.RECOVER_LEASE_ON_CLOSE_EXCEPTION_DEFAULT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Write.RECOVER_LEASE_ON_CLOSE_EXCEPTION_KEY;
/**
* This class supports writing files in striped layout and erasure coded format.
@ -1200,6 +1202,9 @@ public class DFSStripedOutputStream extends DFSOutputStream
@Override
protected synchronized void closeImpl() throws IOException {
boolean recoverLeaseOnCloseException = dfsClient.getConfiguration()
.getBoolean(RECOVER_LEASE_ON_CLOSE_EXCEPTION_KEY,
RECOVER_LEASE_ON_CLOSE_EXCEPTION_DEFAULT);
try {
if (isClosed()) {
exceptionLastSeen.check(true);
@ -1272,6 +1277,9 @@ public class DFSStripedOutputStream extends DFSOutputStream
}
logCorruptBlocks();
} catch (ClosedChannelException ignored) {
} catch (IOException ioe) {
recoverLease(recoverLeaseOnCloseException);
throw ioe;
} finally {
setClosed();
// shutdown executor of flushAll tasks

View File

@ -17,16 +17,22 @@
*/
package org.apache.hadoop.hdfs;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Write.RECOVER_LEASE_ON_CLOSE_EXCEPTION_KEY;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.doThrow;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.permission.FsPermission;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
@ -247,4 +253,75 @@ public class TestDFSStripedOutputStream {
.assertExceptionContains("less than the cell size", expected);
}
}
@Test
public void testExceptionInCloseECFileWithRecoverLease() throws Exception {
Configuration config = new Configuration();
config.setBoolean(RECOVER_LEASE_ON_CLOSE_EXCEPTION_KEY, true);
DFSClient client =
new DFSClient(cluster.getNameNode(0).getNameNodeAddress(), config);
DFSClient spyClient = Mockito.spy(client);
DFSOutputStream dfsOutputStream =
spyClient.create("/testExceptionInCloseECFileWithRecoverLease",
FsPermission.getFileDefault(), EnumSet.of(CreateFlag.CREATE),
(short) 3, 1024*1024, null, 1024, null);
assertTrue("stream should be a DFSStripedOutputStream",
dfsOutputStream instanceof DFSStripedOutputStream);
DFSOutputStream spyDFSOutputStream = Mockito.spy(dfsOutputStream);
doThrow(new IOException("Emulated IOException in close"))
.when(spyDFSOutputStream).completeFile(Mockito.any());
try {
spyDFSOutputStream.close();
fail();
} catch (IOException ioe) {
assertTrue(spyDFSOutputStream.isLeaseRecovered());
waitForFileClosed("/testExceptionInCloseECFileWithRecoverLease");
assertTrue(isFileClosed("/testExceptionInCloseECFileWithRecoverLease"));
}
}
@Test
public void testExceptionInCloseECFileWithoutRecoverLease() throws Exception {
Configuration config = new Configuration();
DFSClient client =
new DFSClient(cluster.getNameNode(0).getNameNodeAddress(), config);
DFSClient spyClient = Mockito.spy(client);
DFSOutputStream dfsOutputStream =
spyClient.create("/testExceptionInCloseECFileWithoutRecoverLease",
FsPermission.getFileDefault(), EnumSet.of(CreateFlag.CREATE),
(short) 3, 1024*1024, null, 1024, null);
assertTrue("stream should be a DFSStripedOutputStream",
dfsOutputStream instanceof DFSStripedOutputStream);
DFSOutputStream spyDFSOutputStream = Mockito.spy(dfsOutputStream);
doThrow(new IOException("Emulated IOException in close"))
.when(spyDFSOutputStream).completeFile(Mockito.any());
try {
spyDFSOutputStream.close();
fail();
} catch (IOException ioe) {
assertFalse(spyDFSOutputStream.isLeaseRecovered());
try {
waitForFileClosed("/testExceptionInCloseECFileWithoutRecoverLease");
} catch (TimeoutException e) {
assertFalse(
isFileClosed("/testExceptionInCloseECFileWithoutRecoverLease"));
}
}
}
private boolean isFileClosed(String path) throws IOException {
return cluster.getFileSystem().isFileClosed(new Path(path));
}
private void waitForFileClosed(String path) throws Exception {
GenericTestUtils.waitFor(() -> {
boolean closed;
try {
closed = isFileClosed(path);
} catch (IOException e) {
return false;
}
return closed;
}, 1000, 5000);
}
}