HDFS-13164. File not closed if streamer fail with DSQuotaExceededException.
(cherry picked from commit51088d3233
) Conflicts: hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestQuota.java (cherry picked from commit80f7165374
)
This commit is contained in:
parent
76d4d5c759
commit
16e14b9969
|
@ -828,7 +828,19 @@ public class DFSOutputStream extends FSOutputSummer
|
||||||
|
|
||||||
protected synchronized void closeImpl() throws IOException {
|
protected synchronized void closeImpl() throws IOException {
|
||||||
if (isClosed()) {
|
if (isClosed()) {
|
||||||
|
LOG.debug("Closing an already closed stream. [Stream:{}, streamer:{}]",
|
||||||
|
closed, getStreamer().streamerClosed());
|
||||||
|
try {
|
||||||
getStreamer().getLastException().check(true);
|
getStreamer().getLastException().check(true);
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
cleanupAndRethrowIOException(ioe);
|
||||||
|
} finally {
|
||||||
|
if (!closed) {
|
||||||
|
// If stream is not closed but streamer closed, clean up the stream.
|
||||||
|
// Most importantly, end the file lease.
|
||||||
|
closeThreads(true);
|
||||||
|
}
|
||||||
|
}
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -843,14 +855,12 @@ public class DFSOutputStream extends FSOutputSummer
|
||||||
setCurrentPacketToEmpty();
|
setCurrentPacketToEmpty();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
flushInternal(); // flush all data to Datanodes
|
flushInternal(); // flush all data to Datanodes
|
||||||
// get last block before destroying the streamer
|
} catch (IOException ioe) {
|
||||||
ExtendedBlock lastBlock = getStreamer().getBlock();
|
cleanupAndRethrowIOException(ioe);
|
||||||
|
|
||||||
try (TraceScope ignored =
|
|
||||||
dfsClient.getTracer().newScope("completeFile")) {
|
|
||||||
completeFile(lastBlock);
|
|
||||||
}
|
}
|
||||||
|
completeFile();
|
||||||
} catch (ClosedChannelException ignored) {
|
} catch (ClosedChannelException ignored) {
|
||||||
} finally {
|
} finally {
|
||||||
// Failures may happen when flushing data.
|
// Failures may happen when flushing data.
|
||||||
|
@ -862,6 +872,43 @@ public class DFSOutputStream extends FSOutputSummer
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void completeFile() throws IOException {
|
||||||
|
// get last block before destroying the streamer
|
||||||
|
ExtendedBlock lastBlock = getStreamer().getBlock();
|
||||||
|
try (TraceScope ignored =
|
||||||
|
dfsClient.getTracer().newScope("completeFile")) {
|
||||||
|
completeFile(lastBlock);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Determines whether an IOException thrown needs extra cleanup on the stream.
|
||||||
|
* Space quota exceptions will be thrown when getting new blocks, so the
|
||||||
|
* open HDFS file need to be closed.
|
||||||
|
*
|
||||||
|
* @param ioe the IOException
|
||||||
|
* @return whether the stream needs cleanup for the given IOException
|
||||||
|
*/
|
||||||
|
private boolean exceptionNeedsCleanup(IOException ioe) {
|
||||||
|
return ioe instanceof DSQuotaExceededException
|
||||||
|
|| ioe instanceof QuotaByStorageTypeExceededException;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void cleanupAndRethrowIOException(IOException ioe)
|
||||||
|
throws IOException {
|
||||||
|
if (exceptionNeedsCleanup(ioe)) {
|
||||||
|
final MultipleIOException.Builder b = new MultipleIOException.Builder();
|
||||||
|
b.add(ioe);
|
||||||
|
try {
|
||||||
|
completeFile();
|
||||||
|
} catch (IOException e) {
|
||||||
|
b.add(e);
|
||||||
|
throw b.build();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
throw ioe;
|
||||||
|
}
|
||||||
|
|
||||||
// should be called holding (this) lock since setTestFilename() may
|
// should be called holding (this) lock since setTestFilename() may
|
||||||
// be called during unit tests
|
// be called during unit tests
|
||||||
protected void completeFile(ExtendedBlock last) throws IOException {
|
protected void completeFile(ExtendedBlock last) throws IOException {
|
||||||
|
|
|
@ -74,7 +74,7 @@ import org.slf4j.LoggerFactory;
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class LeaseRenewer {
|
public class LeaseRenewer {
|
||||||
static final Logger LOG = LoggerFactory.getLogger(LeaseRenewer.class);
|
public static final Logger LOG = LoggerFactory.getLogger(LeaseRenewer.class);
|
||||||
|
|
||||||
private static long leaseRenewerGraceDefault = 60*1000L;
|
private static long leaseRenewerGraceDefault = 60*1000L;
|
||||||
static final long LEASE_RENEWER_SLEEP_DEFAULT = 1000L;
|
static final long LEASE_RENEWER_SLEEP_DEFAULT = 1000L;
|
||||||
|
|
|
@ -35,6 +35,7 @@ import java.security.PrivilegedExceptionAction;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Scanner;
|
import java.util.Scanner;
|
||||||
|
|
||||||
|
import com.google.common.base.Supplier;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.ContentSummary;
|
import org.apache.hadoop.fs.ContentSummary;
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
|
@ -43,6 +44,7 @@ import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.QuotaUsage;
|
import org.apache.hadoop.fs.QuotaUsage;
|
||||||
import org.apache.hadoop.fs.StorageType;
|
import org.apache.hadoop.fs.StorageType;
|
||||||
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||||
|
import org.apache.hadoop.hdfs.client.impl.LeaseRenewer;
|
||||||
import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
|
import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
|
import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
|
||||||
|
@ -58,14 +60,21 @@ import org.apache.hadoop.util.ToolRunner;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.Rule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import com.google.common.base.Charsets;
|
import com.google.common.base.Charsets;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
|
import org.junit.rules.Timeout;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.event.Level;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
/** A class for testing quota-related commands */
|
/** A class for testing quota-related commands */
|
||||||
public class TestQuota {
|
public class TestQuota {
|
||||||
|
|
||||||
|
private static final Logger LOG = LoggerFactory.getLogger(TestQuota.class);
|
||||||
|
|
||||||
private static Configuration conf = null;
|
private static Configuration conf = null;
|
||||||
private static final ByteArrayOutputStream OUT_STREAM = new ByteArrayOutputStream();
|
private static final ByteArrayOutputStream OUT_STREAM = new ByteArrayOutputStream();
|
||||||
private static final ByteArrayOutputStream ERR_STREAM = new ByteArrayOutputStream();
|
private static final ByteArrayOutputStream ERR_STREAM = new ByteArrayOutputStream();
|
||||||
|
@ -77,6 +86,9 @@ public class TestQuota {
|
||||||
/* set a smaller block size so that we can test with smaller space quotas */
|
/* set a smaller block size so that we can test with smaller space quotas */
|
||||||
private static final int DEFAULT_BLOCK_SIZE = 512;
|
private static final int DEFAULT_BLOCK_SIZE = 512;
|
||||||
|
|
||||||
|
@Rule
|
||||||
|
public final Timeout testTestout = new Timeout(120000);
|
||||||
|
|
||||||
@BeforeClass
|
@BeforeClass
|
||||||
public static void setUpClass() throws Exception {
|
public static void setUpClass() throws Exception {
|
||||||
conf = new HdfsConfiguration();
|
conf = new HdfsConfiguration();
|
||||||
|
@ -1462,6 +1474,101 @@ public class TestQuota {
|
||||||
"clrSpaceQuota");
|
"clrSpaceQuota");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSpaceQuotaExceptionOnClose() throws Exception {
|
||||||
|
GenericTestUtils.setLogLevel(DFSOutputStream.LOG, Level.TRACE);
|
||||||
|
GenericTestUtils.setLogLevel(DataStreamer.LOG, Level.TRACE);
|
||||||
|
final DFSAdmin dfsAdmin = new DFSAdmin(conf);
|
||||||
|
final Path dir = new Path(PathUtils.getTestPath(getClass()),
|
||||||
|
GenericTestUtils.getMethodName());
|
||||||
|
assertTrue(dfs.mkdirs(dir));
|
||||||
|
final String[] args = new String[] {"-setSpaceQuota", "1", dir.toString()};
|
||||||
|
assertEquals(0, ToolRunner.run(dfsAdmin, args));
|
||||||
|
|
||||||
|
final Path testFile = new Path(dir, "file");
|
||||||
|
final FSDataOutputStream stream = dfs.create(testFile);
|
||||||
|
stream.write("whatever".getBytes());
|
||||||
|
try {
|
||||||
|
stream.close();
|
||||||
|
fail("close should fail");
|
||||||
|
} catch (DSQuotaExceededException expected) {
|
||||||
|
}
|
||||||
|
|
||||||
|
assertEquals(0, cluster.getNamesystem().getNumFilesUnderConstruction());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSpaceQuotaExceptionOnFlush() throws Exception {
|
||||||
|
GenericTestUtils.setLogLevel(DFSOutputStream.LOG, Level.TRACE);
|
||||||
|
GenericTestUtils.setLogLevel(DataStreamer.LOG, Level.TRACE);
|
||||||
|
GenericTestUtils.setLogLevel(DFSClient.LOG, Level.TRACE);
|
||||||
|
final DFSAdmin dfsAdmin = new DFSAdmin(conf);
|
||||||
|
final Path dir = new Path(PathUtils.getTestPath(getClass()),
|
||||||
|
GenericTestUtils.getMethodName());
|
||||||
|
assertTrue(dfs.mkdirs(dir));
|
||||||
|
final String[] args = new String[] {"-setSpaceQuota", "1", dir.toString()};
|
||||||
|
assertEquals(0, ToolRunner.run(dfsAdmin, args));
|
||||||
|
|
||||||
|
Path testFile = new Path(dir, "file");
|
||||||
|
FSDataOutputStream stream = dfs.create(testFile);
|
||||||
|
// get the lease renewer now so we can verify it later without calling
|
||||||
|
// getLeaseRenewer, which will automatically add the client into it.
|
||||||
|
final LeaseRenewer leaseRenewer = dfs.getClient().getLeaseRenewer();
|
||||||
|
stream.write("whatever".getBytes());
|
||||||
|
try {
|
||||||
|
stream.hflush();
|
||||||
|
fail("flush should fail");
|
||||||
|
} catch (DSQuotaExceededException expected) {
|
||||||
|
}
|
||||||
|
// even if we close the stream in finially, it won't help.
|
||||||
|
try {
|
||||||
|
stream.close();
|
||||||
|
fail("close should fail too");
|
||||||
|
} catch (DSQuotaExceededException expected) {
|
||||||
|
}
|
||||||
|
|
||||||
|
GenericTestUtils.setLogLevel(LeaseRenewer.LOG, Level.TRACE);
|
||||||
|
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||||
|
@Override
|
||||||
|
public Boolean get() {
|
||||||
|
LOG.info("LeaseRenewer: {}", leaseRenewer);
|
||||||
|
return leaseRenewer.isEmpty();
|
||||||
|
}
|
||||||
|
}, 100, 10000);
|
||||||
|
assertEquals(0, cluster.getNamesystem().getNumFilesUnderConstruction());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSpaceQuotaExceptionOnAppend() throws Exception {
|
||||||
|
GenericTestUtils.setLogLevel(DFSOutputStream.LOG, Level.TRACE);
|
||||||
|
GenericTestUtils.setLogLevel(DataStreamer.LOG, Level.TRACE);
|
||||||
|
final DFSAdmin dfsAdmin = new DFSAdmin(conf);
|
||||||
|
final Path dir = new Path(PathUtils.getTestPath(getClass()),
|
||||||
|
GenericTestUtils.getMethodName());
|
||||||
|
dfs.delete(dir, true);
|
||||||
|
assertTrue(dfs.mkdirs(dir));
|
||||||
|
final String[] args =
|
||||||
|
new String[] {"-setSpaceQuota", "4000", dir.toString()};
|
||||||
|
ToolRunner.run(dfsAdmin, args);
|
||||||
|
|
||||||
|
final Path testFile = new Path(dir, "file");
|
||||||
|
OutputStream stream = dfs.create(testFile);
|
||||||
|
stream.write("whatever".getBytes());
|
||||||
|
stream.close();
|
||||||
|
|
||||||
|
assertEquals(0, cluster.getNamesystem().getNumFilesUnderConstruction());
|
||||||
|
|
||||||
|
stream = dfs.append(testFile);
|
||||||
|
byte[] buf = AppendTestUtil.initBuffer(4096);
|
||||||
|
stream.write(buf);
|
||||||
|
try {
|
||||||
|
stream.close();
|
||||||
|
fail("close after append should fail");
|
||||||
|
} catch (DSQuotaExceededException expected) {
|
||||||
|
}
|
||||||
|
assertEquals(0, cluster.getNamesystem().getNumFilesUnderConstruction());
|
||||||
|
}
|
||||||
|
|
||||||
private void testSetAndClearSpaceQuotaNoAccessInternal(
|
private void testSetAndClearSpaceQuotaNoAccessInternal(
|
||||||
final String[] args,
|
final String[] args,
|
||||||
final int cmdRet,
|
final int cmdRet,
|
||||||
|
|
Loading…
Reference in New Issue