diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java index 9f16af5c30c..c8036c7bccc 100755 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -828,7 +828,19 @@ public class DFSOutputStream extends FSOutputSummer protected synchronized void closeImpl() throws IOException { if (isClosed()) { - getStreamer().getLastException().check(true); + LOG.debug("Closing an already closed stream. [Stream:{}, streamer:{}]", + closed, getStreamer().streamerClosed()); + try { + getStreamer().getLastException().check(true); + } catch (IOException ioe) { + cleanupAndRethrowIOException(ioe); + } finally { + if (!closed) { + // If stream is not closed but streamer closed, clean up the stream. + // Most importantly, end the file lease. + closeThreads(true); + } + } return; } @@ -843,14 +855,12 @@ public class DFSOutputStream extends FSOutputSummer setCurrentPacketToEmpty(); } - flushInternal(); // flush all data to Datanodes - // get last block before destroying the streamer - ExtendedBlock lastBlock = getStreamer().getBlock(); - - try (TraceScope ignored = - dfsClient.getTracer().newScope("completeFile")) { - completeFile(lastBlock); + try { + flushInternal(); // flush all data to Datanodes + } catch (IOException ioe) { + cleanupAndRethrowIOException(ioe); } + completeFile(); } catch (ClosedChannelException ignored) { } finally { // Failures may happen when flushing data. @@ -862,6 +872,43 @@ public class DFSOutputStream extends FSOutputSummer } } + private void completeFile() throws IOException { + // get last block before destroying the streamer + ExtendedBlock lastBlock = getStreamer().getBlock(); + try (TraceScope ignored = + dfsClient.getTracer().newScope("completeFile")) { + completeFile(lastBlock); + } + } + + /** + * Determines whether an IOException thrown needs extra cleanup on the stream. + * Space quota exceptions will be thrown when getting new blocks, so the + * open HDFS file need to be closed. + * + * @param ioe the IOException + * @return whether the stream needs cleanup for the given IOException + */ + private boolean exceptionNeedsCleanup(IOException ioe) { + return ioe instanceof DSQuotaExceededException + || ioe instanceof QuotaByStorageTypeExceededException; + } + + private void cleanupAndRethrowIOException(IOException ioe) + throws IOException { + if (exceptionNeedsCleanup(ioe)) { + final MultipleIOException.Builder b = new MultipleIOException.Builder(); + b.add(ioe); + try { + completeFile(); + } catch (IOException e) { + b.add(e); + throw b.build(); + } + } + throw ioe; + } + // should be called holding (this) lock since setTestFilename() may // be called during unit tests protected void completeFile(ExtendedBlock last) throws IOException { diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/LeaseRenewer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/LeaseRenewer.java index e33d024e853..957c0a9bf68 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/LeaseRenewer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/LeaseRenewer.java @@ -74,7 +74,7 @@ import org.slf4j.LoggerFactory; */ @InterfaceAudience.Private public class LeaseRenewer { - static final Logger LOG = LoggerFactory.getLogger(LeaseRenewer.class); + public static final Logger LOG = LoggerFactory.getLogger(LeaseRenewer.class); private static long leaseRenewerGraceDefault = 60*1000L; static final long LEASE_RENEWER_SLEEP_DEFAULT = 1000L; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestQuota.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestQuota.java index 3ad18114a69..48092c72445 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestQuota.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestQuota.java @@ -35,6 +35,7 @@ import java.security.PrivilegedExceptionAction; import java.util.List; import java.util.Scanner; +import com.google.common.base.Supplier; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.FSDataOutputStream; @@ -43,6 +44,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.QuotaUsage; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; +import org.apache.hadoop.hdfs.client.impl.LeaseRenewer; import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException; @@ -58,14 +60,21 @@ import org.apache.hadoop.util.ToolRunner; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; +import org.junit.Rule; import org.junit.Test; import com.google.common.base.Charsets; import com.google.common.collect.Lists; +import org.junit.rules.Timeout; +import org.slf4j.Logger; +import org.slf4j.event.Level; +import org.slf4j.LoggerFactory; /** A class for testing quota-related commands */ public class TestQuota { - + + private static final Logger LOG = LoggerFactory.getLogger(TestQuota.class); + private static Configuration conf = null; private static final ByteArrayOutputStream OUT_STREAM = new ByteArrayOutputStream(); private static final ByteArrayOutputStream ERR_STREAM = new ByteArrayOutputStream(); @@ -77,6 +86,9 @@ public class TestQuota { /* set a smaller block size so that we can test with smaller space quotas */ private static final int DEFAULT_BLOCK_SIZE = 512; + @Rule + public final Timeout testTestout = new Timeout(120000); + @BeforeClass public static void setUpClass() throws Exception { conf = new HdfsConfiguration(); @@ -1462,6 +1474,101 @@ public class TestQuota { "clrSpaceQuota"); } + @Test + public void testSpaceQuotaExceptionOnClose() throws Exception { + GenericTestUtils.setLogLevel(DFSOutputStream.LOG, Level.TRACE); + GenericTestUtils.setLogLevel(DataStreamer.LOG, Level.TRACE); + final DFSAdmin dfsAdmin = new DFSAdmin(conf); + final Path dir = new Path(PathUtils.getTestPath(getClass()), + GenericTestUtils.getMethodName()); + assertTrue(dfs.mkdirs(dir)); + final String[] args = new String[] {"-setSpaceQuota", "1", dir.toString()}; + assertEquals(0, ToolRunner.run(dfsAdmin, args)); + + final Path testFile = new Path(dir, "file"); + final FSDataOutputStream stream = dfs.create(testFile); + stream.write("whatever".getBytes()); + try { + stream.close(); + fail("close should fail"); + } catch (DSQuotaExceededException expected) { + } + + assertEquals(0, cluster.getNamesystem().getNumFilesUnderConstruction()); + } + + @Test + public void testSpaceQuotaExceptionOnFlush() throws Exception { + GenericTestUtils.setLogLevel(DFSOutputStream.LOG, Level.TRACE); + GenericTestUtils.setLogLevel(DataStreamer.LOG, Level.TRACE); + GenericTestUtils.setLogLevel(DFSClient.LOG, Level.TRACE); + final DFSAdmin dfsAdmin = new DFSAdmin(conf); + final Path dir = new Path(PathUtils.getTestPath(getClass()), + GenericTestUtils.getMethodName()); + assertTrue(dfs.mkdirs(dir)); + final String[] args = new String[] {"-setSpaceQuota", "1", dir.toString()}; + assertEquals(0, ToolRunner.run(dfsAdmin, args)); + + Path testFile = new Path(dir, "file"); + FSDataOutputStream stream = dfs.create(testFile); + // get the lease renewer now so we can verify it later without calling + // getLeaseRenewer, which will automatically add the client into it. + final LeaseRenewer leaseRenewer = dfs.getClient().getLeaseRenewer(); + stream.write("whatever".getBytes()); + try { + stream.hflush(); + fail("flush should fail"); + } catch (DSQuotaExceededException expected) { + } + // even if we close the stream in finially, it won't help. + try { + stream.close(); + fail("close should fail too"); + } catch (DSQuotaExceededException expected) { + } + + GenericTestUtils.setLogLevel(LeaseRenewer.LOG, Level.TRACE); + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + LOG.info("LeaseRenewer: {}", leaseRenewer); + return leaseRenewer.isEmpty(); + } + }, 100, 10000); + assertEquals(0, cluster.getNamesystem().getNumFilesUnderConstruction()); + } + + @Test + public void testSpaceQuotaExceptionOnAppend() throws Exception { + GenericTestUtils.setLogLevel(DFSOutputStream.LOG, Level.TRACE); + GenericTestUtils.setLogLevel(DataStreamer.LOG, Level.TRACE); + final DFSAdmin dfsAdmin = new DFSAdmin(conf); + final Path dir = new Path(PathUtils.getTestPath(getClass()), + GenericTestUtils.getMethodName()); + dfs.delete(dir, true); + assertTrue(dfs.mkdirs(dir)); + final String[] args = + new String[] {"-setSpaceQuota", "4000", dir.toString()}; + ToolRunner.run(dfsAdmin, args); + + final Path testFile = new Path(dir, "file"); + OutputStream stream = dfs.create(testFile); + stream.write("whatever".getBytes()); + stream.close(); + + assertEquals(0, cluster.getNamesystem().getNumFilesUnderConstruction()); + + stream = dfs.append(testFile); + byte[] buf = AppendTestUtil.initBuffer(4096); + stream.write(buf); + try { + stream.close(); + fail("close after append should fail"); + } catch (DSQuotaExceededException expected) { + } + assertEquals(0, cluster.getNamesystem().getNumFilesUnderConstruction()); + } + private void testSetAndClearSpaceQuotaNoAccessInternal( final String[] args, final int cmdRet,