diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java index e9770548a7b..12a0aa4cea6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -23,6 +23,7 @@ import java.io.InterruptedIOException; import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; import java.util.EnumSet; +import java.util.Random; import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.HadoopIllegalArgumentException; @@ -126,6 +127,8 @@ public class DFSOutputStream extends FSOutputSummer protected final AtomicReference cachingStrategy; private FileEncryptionInfo fileEncryptionInfo; private int writePacketSize; + private boolean leaseRecovered = false; + private boolean exceptionInClose = false; //for unit test /** Use {@link ByteArrayManager} to create buffer for non-heartbeat packets.*/ protected DFSPacket createPacket(int packetSize, int chunksPerPkt, @@ -832,6 +835,39 @@ public class DFSOutputStream extends FSOutputSummer } } + @VisibleForTesting + public void setExceptionInClose(boolean enable) { + exceptionInClose = enable; + } + + private class EmulateExceptionInClose { + private Random rand = null; + private int kickedNum; + + EmulateExceptionInClose(int callNum) { + if (exceptionInClose) { + rand = new Random(); + } + kickedNum = callNum; + } + + void kickRandomException() throws IOException { + if (exceptionInClose) { + if (kickedNum > 0) { + if (rand.nextInt(kickedNum) == 1) { + throw new IOException("Emulated random IOException in close"); + } + } + } + } + + void kickException() throws IOException { + if (exceptionInClose) { + throw new IOException("Emulated IOException in close"); + } + } + } + /** * Closes this output stream and releases any system * resources associated with this stream. @@ -854,7 +890,20 @@ public class DFSOutputStream extends FSOutputSummer } protected synchronized void closeImpl() throws IOException { + boolean recoverOnCloseException = dfsClient.getConfiguration().getBoolean( + HdfsClientConfigKeys.Write.RECOVER_ON_CLOSE_EXCEPTION_KEY, + HdfsClientConfigKeys.Write.RECOVER_ON_CLOSE_EXCEPTION_DEFAULT); if (isClosed()) { + if (recoverOnCloseException && !leaseRecovered) { + try { + dfsClient.endFileLease(fileId); + dfsClient.recoverLease(src); + leaseRecovered = true; + } catch (Exception e) { + LOG.warn("Fail to recover lease for {}", src, e); + } + } + LOG.debug("Closing an already closed stream. [Stream:{}, streamer:{}]", closed, getStreamer().streamerClosed()); try { @@ -871,8 +920,11 @@ public class DFSOutputStream extends FSOutputSummer return; } + EmulateExceptionInClose eei = new EmulateExceptionInClose(5); try { - flushBuffer(); // flush from all upper layers + flushBuffer(); // flush from all upper layers + // for test + eei.kickRandomException(); if (currentPacket != null) { enqueueCurrentPacket(); @@ -883,12 +935,28 @@ public class DFSOutputStream extends FSOutputSummer } try { - flushInternal(); // flush all data to Datanodes + flushInternal(); // flush all data to Datanodes } catch (IOException ioe) { cleanupAndRethrowIOException(ioe); } + // for test + eei.kickRandomException(); completeFile(); + // for test + eei.kickException(); } catch (ClosedChannelException ignored) { + } catch (IOException ioe) { + if (recoverOnCloseException) { + try { + dfsClient.endFileLease(fileId); + dfsClient.recoverLease(src); + leaseRecovered = true; + } catch (Exception e) { + // Ignore exception rendered by recoverLease. Throw original + // exception + } + } + throw ioe; } finally { // Failures may happen when flushing data. // Streamers may keep waiting for the new block information. diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java index 3d934b44ddd..56521ce4ebe 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java @@ -302,6 +302,9 @@ public interface HdfsClientConfigKeys { String EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_KEY = PREFIX + "exclude.nodes.cache.expiry.interval.millis"; long EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_DEFAULT = 10*MINUTE; + String RECOVER_ON_CLOSE_EXCEPTION_KEY = + PREFIX + "recover.on.close.exception"; + boolean RECOVER_ON_CLOSE_EXCEPTION_DEFAULT = false; interface ByteArrayManager { String PREFIX = Write.PREFIX + "byte-array-manager."; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java index 581f9548852..a369948da8d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java @@ -31,6 +31,7 @@ import java.util.LinkedList; import java.util.Map; import java.util.Random; +import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.FSDataOutputStream; @@ -40,6 +41,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.StreamCapabilities.StreamCapability; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.DataStreamer.LastExceptionInStreamer; +import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.client.impl.DfsClientConf; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; @@ -371,6 +373,41 @@ public class TestDFSOutputStream { os.close(); } + /** + * If dfs.client.recover-on-close-exception.enable is set and exception + * happens in close, the local lease should be closed and lease in namenode + * should be recovered. + */ + @Test + public void testExceptionInClose() throws Exception { + String testStr = "Test exception in close"; + DistributedFileSystem fs = cluster.getFileSystem(); + Path testFile = new Path("/closeexception"); + fs.getConf().setBoolean( + HdfsClientConfigKeys.Write.RECOVER_ON_CLOSE_EXCEPTION_KEY, true); + FSDataOutputStream os = fs.create(testFile); + DFSOutputStream dos = + (DFSOutputStream) FieldUtils.readField(os, "wrappedStream", true); + dos.setExceptionInClose(true); + os.write(testStr.getBytes()); + try { + dos.close(); + // There should be exception + Assert.assertTrue(false); + } catch (IOException ioe) { + GenericTestUtils.waitFor(() -> { + boolean closed; + try { + closed = fs.isFileClosed(testFile); + } catch (IOException e) { + return false; + } + return closed; + }, 1000, 5000); + Assert.assertTrue(fs.isFileClosed(testFile)); + } + } + @AfterClass public static void tearDown() { if (cluster != null) {