HDFS-1753. Resource Leak in StreamFile. Contributed by Uma Maheswara Rao G
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1143106 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
eadb1a0323
commit
1e5e03c73c
|
@ -862,6 +862,8 @@ Release 0.22.0 - Unreleased
|
|||
HDFS-528. Add ability for safemode to wait for a minimum number of
|
||||
live datanodes (Todd Lipcon via eli)
|
||||
|
||||
HDFS-1753. Resource Leak in StreamFile. (Uma Maheswara Rao G via eli)
|
||||
|
||||
IMPROVEMENTS
|
||||
|
||||
HDFS-1304. Add a new unit test for HftpFileSystem.open(..). (szetszwo)
|
||||
|
|
|
@ -18,7 +18,6 @@
|
|||
package org.apache.hadoop.hdfs.server.namenode;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.io.PrintWriter;
|
||||
import java.util.Enumeration;
|
||||
|
@ -85,30 +84,40 @@ public class StreamFile extends DfsServlet {
|
|||
return;
|
||||
}
|
||||
|
||||
final DFSInputStream in = dfs.open(filename);
|
||||
final long fileLen = in.getFileLength();
|
||||
OutputStream os = response.getOutputStream();
|
||||
DFSInputStream in = null;
|
||||
OutputStream out = null;
|
||||
|
||||
try {
|
||||
in = dfs.open(filename);
|
||||
out = response.getOutputStream();
|
||||
final long fileLen = in.getFileLength();
|
||||
if (reqRanges != null) {
|
||||
List<InclusiveByteRange> ranges =
|
||||
InclusiveByteRange.satisfiableRanges(reqRanges, fileLen);
|
||||
StreamFile.sendPartialData(in, os, response, fileLen, ranges, true);
|
||||
StreamFile.sendPartialData(in, out, response, fileLen, ranges);
|
||||
} else {
|
||||
// No ranges, so send entire file
|
||||
response.setHeader("Content-Disposition", "attachment; filename=\"" +
|
||||
filename + "\"");
|
||||
response.setContentType("application/octet-stream");
|
||||
response.setHeader(CONTENT_LENGTH, "" + fileLen);
|
||||
StreamFile.copyFromOffset(in, os, 0L, fileLen, true);
|
||||
StreamFile.copyFromOffset(in, out, 0L, fileLen);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("response.isCommitted()=" + response.isCommitted(), e);
|
||||
}
|
||||
throw e;
|
||||
} finally {
|
||||
in.close();
|
||||
in = null;
|
||||
out.close();
|
||||
out = null;
|
||||
dfs.close();
|
||||
dfs = null;
|
||||
} catch (IOException ioe) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("response.isCommitted()=" + response.isCommitted(), ioe);
|
||||
}
|
||||
throw ioe;
|
||||
} finally {
|
||||
IOUtils.cleanup(LOG, in);
|
||||
IOUtils.cleanup(LOG, out);
|
||||
IOUtils.cleanup(LOG, dfs);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -122,15 +131,13 @@ public class StreamFile extends DfsServlet {
|
|||
* @param response http response to use
|
||||
* @param contentLength for the response header
|
||||
* @param ranges to write to respond with
|
||||
* @param close whether to close the streams
|
||||
* @throws IOException on error sending the response
|
||||
*/
|
||||
static void sendPartialData(FSInputStream in,
|
||||
OutputStream out,
|
||||
HttpServletResponse response,
|
||||
long contentLength,
|
||||
List<InclusiveByteRange> ranges,
|
||||
boolean close)
|
||||
List<InclusiveByteRange> ranges)
|
||||
throws IOException {
|
||||
if (ranges == null || ranges.size() != 1) {
|
||||
response.setContentLength(0);
|
||||
|
@ -145,14 +152,14 @@ public class StreamFile extends DfsServlet {
|
|||
singleSatisfiableRange.toHeaderRangeString(contentLength));
|
||||
copyFromOffset(in, out,
|
||||
singleSatisfiableRange.getFirst(contentLength),
|
||||
singleLength, close);
|
||||
singleLength);
|
||||
}
|
||||
}
|
||||
|
||||
/* Copy count bytes at the given offset from one stream to another */
|
||||
static void copyFromOffset(FSInputStream in, OutputStream out, long offset,
|
||||
long count, boolean close) throws IOException {
|
||||
long count) throws IOException {
|
||||
in.seek(offset);
|
||||
IOUtils.copyBytes(in, out, count, close);
|
||||
IOUtils.copyBytes(in, out, count, false);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,18 +20,30 @@ package org.apache.hadoop.hdfs.server.namenode;
|
|||
import static org.junit.Assert.assertArrayEquals;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.DataOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Enumeration;
|
||||
import java.util.List;
|
||||
import java.util.Vector;
|
||||
|
||||
import javax.servlet.ServletContext;
|
||||
import javax.servlet.ServletOutputStream;
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
|
||||
import org.junit.Test;
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
import org.apache.hadoop.fs.FSInputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.DFSClient;
|
||||
import org.apache.hadoop.hdfs.DFSInputStream;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.server.common.JspHelper;
|
||||
import org.mockito.Mockito;
|
||||
import org.mortbay.jetty.InclusiveByteRange;
|
||||
|
||||
/*
|
||||
|
@ -188,6 +200,28 @@ class MockHttpServletResponse implements HttpServletResponse {
|
|||
|
||||
|
||||
public class TestStreamFile {
|
||||
private HdfsConfiguration CONF = new HdfsConfiguration();
|
||||
private DFSClient clientMock = Mockito.mock(DFSClient.class);
|
||||
private HttpServletRequest mockHttpServletRequest =
|
||||
Mockito.mock(HttpServletRequest.class);
|
||||
private HttpServletResponse mockHttpServletResponse =
|
||||
Mockito.mock(HttpServletResponse.class);
|
||||
private final ServletContext mockServletContext =
|
||||
Mockito.mock(ServletContext.class);
|
||||
|
||||
StreamFile sfile = new StreamFile() {
|
||||
private static final long serialVersionUID = -5513776238875189473L;
|
||||
|
||||
public ServletContext getServletContext() {
|
||||
return mockServletContext;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected DFSClient getDFSClient(HttpServletRequest request)
|
||||
throws IOException, InterruptedException {
|
||||
return clientMock;
|
||||
}
|
||||
};
|
||||
|
||||
// return an array matching the output of mockfsinputstream
|
||||
private static byte[] getOutputArray(int start, int count) {
|
||||
|
@ -220,7 +254,7 @@ public class TestStreamFile {
|
|||
assertTrue("Pairs array must be even", pairs.length % 2 == 0);
|
||||
|
||||
for (int i = 0; i < pairs.length; i+=2) {
|
||||
StreamFile.copyFromOffset(fsin, os, pairs[i], pairs[i+1], false);
|
||||
StreamFile.copyFromOffset(fsin, os, pairs[i], pairs[i+1]);
|
||||
assertArrayEquals("Reading " + pairs[i+1]
|
||||
+ " bytes from offset " + pairs[i],
|
||||
getOutputArray(pairs[i], pairs[i+1]),
|
||||
|
@ -246,7 +280,7 @@ public class TestStreamFile {
|
|||
{
|
||||
List<InclusiveByteRange> ranges = strToRanges("0-,10-300", 500);
|
||||
MockHttpServletResponse response = new MockHttpServletResponse();
|
||||
StreamFile.sendPartialData(in, os, response, 500, ranges, false);
|
||||
StreamFile.sendPartialData(in, os, response, 500, ranges);
|
||||
assertEquals("Multiple ranges should result in a 416 error",
|
||||
416, response.getStatus());
|
||||
}
|
||||
|
@ -255,7 +289,7 @@ public class TestStreamFile {
|
|||
{
|
||||
os.reset();
|
||||
MockHttpServletResponse response = new MockHttpServletResponse();
|
||||
StreamFile.sendPartialData(in, os, response, 500, null, false);
|
||||
StreamFile.sendPartialData(in, os, response, 500, null);
|
||||
assertEquals("No ranges should result in a 416 error",
|
||||
416, response.getStatus());
|
||||
}
|
||||
|
@ -264,7 +298,7 @@ public class TestStreamFile {
|
|||
{
|
||||
List<InclusiveByteRange> ranges = strToRanges("600-800", 500);
|
||||
MockHttpServletResponse response = new MockHttpServletResponse();
|
||||
StreamFile.sendPartialData(in, os, response, 500, ranges, false);
|
||||
StreamFile.sendPartialData(in, os, response, 500, ranges);
|
||||
assertEquals("Single (but invalid) range should result in a 416",
|
||||
416, response.getStatus());
|
||||
}
|
||||
|
@ -274,7 +308,7 @@ public class TestStreamFile {
|
|||
{
|
||||
List<InclusiveByteRange> ranges = strToRanges("100-300", 500);
|
||||
MockHttpServletResponse response = new MockHttpServletResponse();
|
||||
StreamFile.sendPartialData(in, os, response, 500, ranges, false);
|
||||
StreamFile.sendPartialData(in, os, response, 500, ranges);
|
||||
assertEquals("Single (valid) range should result in a 206",
|
||||
206, response.getStatus());
|
||||
assertArrayEquals("Byte range from 100-300",
|
||||
|
@ -283,4 +317,108 @@ public class TestStreamFile {
|
|||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
// Test for positive scenario
|
||||
@Test
|
||||
public void testDoGetShouldWriteTheFileContentIntoServletOutputStream()
|
||||
throws Exception {
|
||||
|
||||
MiniDFSCluster cluster = new MiniDFSCluster.Builder(CONF).numDataNodes(1)
|
||||
.build();
|
||||
try {
|
||||
Path testFile = createFile();
|
||||
setUpForDoGetTest(cluster, testFile);
|
||||
ServletOutputStreamExtn outStream = new ServletOutputStreamExtn();
|
||||
Mockito.doReturn(outStream).when(mockHttpServletResponse)
|
||||
.getOutputStream();
|
||||
StreamFile sfile = new StreamFile() {
|
||||
|
||||
private static final long serialVersionUID = 7715590481809562722L;
|
||||
|
||||
public ServletContext getServletContext() {
|
||||
return mockServletContext;
|
||||
}
|
||||
};
|
||||
sfile.doGet(mockHttpServletRequest, mockHttpServletResponse);
|
||||
assertEquals("Not writing the file data into ServletOutputStream",
|
||||
outStream.getResult(), "test");
|
||||
} finally {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
// Test for cleaning the streams in exception cases also
|
||||
@Test
|
||||
public void testDoGetShouldCloseTheDFSInputStreamIfResponseGetOutPutStreamThrowsAnyException()
|
||||
throws Exception {
|
||||
|
||||
MiniDFSCluster cluster = new MiniDFSCluster.Builder(CONF).numDataNodes(1)
|
||||
.build();
|
||||
try {
|
||||
Path testFile = createFile();
|
||||
|
||||
setUpForDoGetTest(cluster, testFile);
|
||||
|
||||
Mockito.doThrow(new IOException()).when(mockHttpServletResponse)
|
||||
.getOutputStream();
|
||||
DFSInputStream fsMock = Mockito.mock(DFSInputStream.class);
|
||||
|
||||
Mockito.doReturn(fsMock).when(clientMock).open(testFile.toString());
|
||||
|
||||
Mockito.doReturn(Long.valueOf(4)).when(fsMock).getFileLength();
|
||||
|
||||
try {
|
||||
sfile.doGet(mockHttpServletRequest, mockHttpServletResponse);
|
||||
fail("Not throwing the IOException");
|
||||
} catch (IOException e) {
|
||||
Mockito.verify(clientMock, Mockito.atLeastOnce()).close();
|
||||
}
|
||||
|
||||
} finally {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
private void setUpForDoGetTest(MiniDFSCluster cluster, Path testFile)
|
||||
throws IOException {
|
||||
|
||||
Mockito.doReturn(CONF).when(mockServletContext).getAttribute(
|
||||
JspHelper.CURRENT_CONF);
|
||||
Mockito.doReturn(NameNode.getHostPortString(NameNode.getAddress(CONF)))
|
||||
.when(mockHttpServletRequest).getParameter("nnaddr");
|
||||
Mockito.doReturn(testFile.toString()).when(mockHttpServletRequest)
|
||||
.getPathInfo();
|
||||
}
|
||||
|
||||
static Path writeFile(FileSystem fs, Path f) throws IOException {
|
||||
DataOutputStream out = fs.create(f);
|
||||
try {
|
||||
out.writeBytes("test");
|
||||
} finally {
|
||||
out.close();
|
||||
}
|
||||
assertTrue(fs.exists(f));
|
||||
return f;
|
||||
}
|
||||
|
||||
private Path createFile() throws IOException {
|
||||
FileSystem fs = FileSystem.get(CONF);
|
||||
Path testFile = new Path("/test/mkdirs/doGet");
|
||||
writeFile(fs, testFile);
|
||||
return testFile;
|
||||
}
|
||||
|
||||
public static class ServletOutputStreamExtn extends ServletOutputStream {
|
||||
private StringBuffer buffer = new StringBuffer(3);
|
||||
|
||||
public String getResult() {
|
||||
return buffer.toString();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(int b) throws IOException {
|
||||
buffer.append((char) b);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue