svn merge -c 1331570 from trunk for HDFS-3334. Fix ByteRangeInputStream stream leakage.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1331572 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2012-04-27 20:14:43 +00:00
parent 596831bcba
commit 73c378c1e7
4 changed files with 174 additions and 33 deletions

View File

@ -798,6 +798,8 @@ Release 0.23.3 - UNRELEASED
HDFS-3321. Fix safe mode turn off tip message. (Ravi Prakash via szetszwo)
HDFS-3334. Fix ByteRangeInputStream stream leakage. (Daryn Sharp via szetszwo)
Release 0.23.2 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -27,6 +27,8 @@ import org.apache.commons.io.input.BoundedInputStream;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.hdfs.server.namenode.StreamFile;
import com.google.common.annotations.VisibleForTesting;
/**
* To support HTTP byte streams, a new connection to an HTTP server needs to be
* created each time. This class hides the complexity of those multiple
@ -61,7 +63,7 @@ public abstract class ByteRangeInputStream extends FSInputStream {
}
enum StreamStatus {
NORMAL, SEEK
NORMAL, SEEK, CLOSED
}
protected InputStream in;
protected URLOpener originalURL;
@ -89,40 +91,51 @@ public abstract class ByteRangeInputStream extends FSInputStream {
protected abstract URL getResolvedUrl(final HttpURLConnection connection
) throws IOException;
private InputStream getInputStream() throws IOException {
if (status != StreamStatus.NORMAL) {
if (in != null) {
in.close();
in = null;
}
// Use the original url if no resolved url exists, eg. if
// it's the first time a request is made.
final URLOpener opener =
(resolvedURL.getURL() == null) ? originalURL : resolvedURL;
final HttpURLConnection connection = opener.openConnection(startPos);
connection.connect();
checkResponseCode(connection);
final String cl = connection.getHeaderField(StreamFile.CONTENT_LENGTH);
if (cl == null) {
throw new IOException(StreamFile.CONTENT_LENGTH+" header is missing");
}
final long streamlength = Long.parseLong(cl);
filelength = startPos + streamlength;
// Java has a bug with >2GB request streams. It won't bounds check
// the reads so the transfer blocks until the server times out
in = new BoundedInputStream(connection.getInputStream(), streamlength);
resolvedURL.setURL(getResolvedUrl(connection));
status = StreamStatus.NORMAL;
@VisibleForTesting
protected InputStream getInputStream() throws IOException {
switch (status) {
case NORMAL:
break;
case SEEK:
if (in != null) {
in.close();
}
in = openInputStream();
status = StreamStatus.NORMAL;
break;
case CLOSED:
throw new IOException("Stream closed");
}
return in;
}
@VisibleForTesting
protected InputStream openInputStream() throws IOException {
// Use the original url if no resolved url exists, eg. if
// it's the first time a request is made.
final URLOpener opener =
(resolvedURL.getURL() == null) ? originalURL : resolvedURL;
final HttpURLConnection connection = opener.openConnection(startPos);
connection.connect();
checkResponseCode(connection);
final String cl = connection.getHeaderField(StreamFile.CONTENT_LENGTH);
if (cl == null) {
throw new IOException(StreamFile.CONTENT_LENGTH+" header is missing");
}
final long streamlength = Long.parseLong(cl);
filelength = startPos + streamlength;
// Java has a bug with >2GB request streams. It won't bounds check
// the reads so the transfer blocks until the server times out
InputStream is =
new BoundedInputStream(connection.getInputStream(), streamlength);
resolvedURL.setURL(getResolvedUrl(connection));
return is;
}
private int update(final int n) throws IOException {
if (n != -1) {
currentPos += n;
@ -150,17 +163,21 @@ public abstract class ByteRangeInputStream extends FSInputStream {
* The next read() will be from that location. Can't
* seek past the end of the file.
*/
@Override
public void seek(long pos) throws IOException {
if (pos != currentPos) {
startPos = pos;
currentPos = pos;
status = StreamStatus.SEEK;
if (status != StreamStatus.CLOSED) {
status = StreamStatus.SEEK;
}
}
}
/**
* Return the current offset from the start of the file
*/
@Override
public long getPos() throws IOException {
return currentPos;
}
@ -169,7 +186,17 @@ public abstract class ByteRangeInputStream extends FSInputStream {
* Seeks a different copy of the data. Returns true if
* found a new source, false otherwise.
*/
@Override
public boolean seekToNewSource(long targetPos) throws IOException {
return false;
}
@Override
public void close() throws IOException {
if (in != null) {
in.close();
in = null;
}
status = StreamStatus.CLOSED;
}
}

View File

@ -19,8 +19,10 @@ package org.apache.hadoop.hdfs;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@ -169,4 +171,74 @@ public static class MockHttpURLConnection extends HttpURLConnection {
"HTTP_OK expected, received 206", e.getMessage());
}
}
@Test
public void testPropagatedClose() throws IOException {
ByteRangeInputStream brs = spy(
new HftpFileSystem.RangeHeaderInputStream(new URL("http://test/")));
InputStream mockStream = mock(InputStream.class);
doReturn(mockStream).when(brs).openInputStream();
int brisOpens = 0;
int brisCloses = 0;
int isCloses = 0;
// first open, shouldn't close underlying stream
brs.getInputStream();
verify(brs, times(++brisOpens)).openInputStream();
verify(brs, times(brisCloses)).close();
verify(mockStream, times(isCloses)).close();
// stream is open, shouldn't close underlying stream
brs.getInputStream();
verify(brs, times(brisOpens)).openInputStream();
verify(brs, times(brisCloses)).close();
verify(mockStream, times(isCloses)).close();
// seek forces a reopen, should close underlying stream
brs.seek(1);
brs.getInputStream();
verify(brs, times(++brisOpens)).openInputStream();
verify(brs, times(brisCloses)).close();
verify(mockStream, times(++isCloses)).close();
// verify that the underlying stream isn't closed after a seek
// ie. the state was correctly updated
brs.getInputStream();
verify(brs, times(brisOpens)).openInputStream();
verify(brs, times(brisCloses)).close();
verify(mockStream, times(isCloses)).close();
// seeking to same location should be a no-op
brs.seek(1);
brs.getInputStream();
verify(brs, times(brisOpens)).openInputStream();
verify(brs, times(brisCloses)).close();
verify(mockStream, times(isCloses)).close();
// close should of course close
brs.close();
verify(brs, times(++brisCloses)).close();
verify(mockStream, times(++isCloses)).close();
// it's already closed, underlying stream should not close
brs.close();
verify(brs, times(++brisCloses)).close();
verify(mockStream, times(isCloses)).close();
// it's closed, don't reopen it
boolean errored = false;
try {
brs.getInputStream();
} catch (IOException e) {
errored = true;
assertEquals("Stream closed", e.getMessage());
} finally {
assertTrue("Read a closed steam", errored);
}
verify(brs, times(brisOpens)).openInputStream();
verify(brs, times(brisCloses)).close();
verify(mockStream, times(isCloses)).close();
}
}

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.hdfs;
import java.io.IOException;
import java.io.InputStream;
import java.net.URISyntaxException;
import java.net.URI;
import java.net.URL;
@ -234,6 +235,45 @@ public class TestHftpFileSystem {
assertEquals('7', in.read());
}
@Test
public void testReadClosedStream() throws IOException {
final Path testFile = new Path("/testfile+2");
FSDataOutputStream os = hdfs.create(testFile, true);
os.writeBytes("0123456789");
os.close();
// ByteRangeInputStream delays opens until reads. Make sure it doesn't
// open a closed stream that has never been opened
FSDataInputStream in = hftpFs.open(testFile);
in.close();
checkClosedStream(in);
checkClosedStream(in.getWrappedStream());
// force the stream to connect and then close it
in = hftpFs.open(testFile);
int ch = in.read();
assertEquals('0', ch);
in.close();
checkClosedStream(in);
checkClosedStream(in.getWrappedStream());
// make sure seeking doesn't automagically reopen the stream
in.seek(4);
checkClosedStream(in);
checkClosedStream(in.getWrappedStream());
}
private void checkClosedStream(InputStream is) {
IOException ioe = null;
try {
is.read();
} catch (IOException e) {
ioe = e;
}
assertNotNull("No exception on closed read", ioe);
assertEquals("Stream closed", ioe.getMessage());
}
public void resetFileSystem() throws IOException {
// filesystem caching has a quirk/bug that it caches based on the user's
// given uri. the result is if a filesystem is instantiated with no port,