HDFS-8797. WebHdfsFileSystem creates too many connections for pread. Contributed by Jing Zhao.

This commit is contained in:
Jing Zhao 2015-07-22 17:42:31 -07:00
parent 06e5dd2c84
commit e91ccfad07
4 changed files with 113 additions and 22 deletions

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.web;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
@ -65,6 +66,16 @@ public abstract class ByteRangeInputStream extends FSInputStream {
final boolean resolved) throws IOException;
}
static class InputStreamAndFileLength {
final Long length;
final InputStream in;
InputStreamAndFileLength(Long length, InputStream in) {
this.length = length;
this.in = in;
}
}
enum StreamStatus {
NORMAL, SEEK, CLOSED
}
@ -101,7 +112,9 @@ public abstract class ByteRangeInputStream extends FSInputStream {
if (in != null) {
in.close();
}
in = openInputStream();
InputStreamAndFileLength fin = openInputStream(startPos);
in = fin.in;
fileLength = fin.length;
status = StreamStatus.NORMAL;
break;
case CLOSED:
@ -111,20 +124,22 @@ public abstract class ByteRangeInputStream extends FSInputStream {
}
@VisibleForTesting
protected InputStream openInputStream() throws IOException {
protected InputStreamAndFileLength openInputStream(long startOffset)
throws IOException {
// Use the original url if no resolved url exists, eg. if
// it's the first time a request is made.
final boolean resolved = resolvedURL.getURL() != null;
final URLOpener opener = resolved? resolvedURL: originalURL;
final HttpURLConnection connection = opener.connect(startPos, resolved);
final HttpURLConnection connection = opener.connect(startOffset, resolved);
resolvedURL.setURL(getResolvedUrl(connection));
InputStream in = connection.getInputStream();
final Long length;
final Map<String, List<String>> headers = connection.getHeaderFields();
if (isChunkedTransferEncoding(headers)) {
// file length is not known
fileLength = null;
length = null;
} else {
// for non-chunked transfer-encoding, get content-length
final String cl = connection.getHeaderField(HttpHeaders.CONTENT_LENGTH);
@ -133,14 +148,14 @@ public abstract class ByteRangeInputStream extends FSInputStream {
+ headers);
}
final long streamlength = Long.parseLong(cl);
fileLength = startPos + streamlength;
length = startOffset + streamlength;
// Java has a bug with >2GB request streams. It won't bounds check
// the reads so the transfer blocks until the server times out
in = new BoundedInputStream(in, streamlength);
}
return in;
return new InputStreamAndFileLength(length, in);
}
private static boolean isChunkedTransferEncoding(
@ -204,6 +219,36 @@ public abstract class ByteRangeInputStream extends FSInputStream {
}
}
@Override
public int read(long position, byte[] buffer, int offset, int length)
throws IOException {
try (InputStream in = openInputStream(position).in) {
return in.read(buffer, offset, length);
}
}
@Override
public void readFully(long position, byte[] buffer, int offset, int length)
throws IOException {
final InputStreamAndFileLength fin = openInputStream(position);
if (fin.length != null && length + position > fin.length) {
throw new EOFException("The length to read " + length
+ " exceeds the file length " + fin.length);
}
try {
int nread = 0;
while (nread < length) {
int nbytes = fin.in.read(buffer, offset + nread, length - nread);
if (nbytes < 0) {
throw new EOFException("End of file reached before reading fully.");
}
nread += nbytes;
}
} finally {
fin.in.close();
}
}
/**
* Return the current offset from the start of the file
*/

View File

@ -742,6 +742,8 @@ Release 2.8.0 - UNRELEASED
HDFS-8795. Improve InvalidateBlocks#node2blocks. (yliu)
HDFS-8797. WebHdfsFileSystem creates too many connections for pread. (jing9)
OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

View File

@ -35,7 +35,9 @@ import java.net.HttpURLConnection;
import java.net.URL;
import com.google.common.net.HttpHeaders;
import org.apache.hadoop.hdfs.web.ByteRangeInputStream.InputStreamAndFileLength;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.internal.util.reflection.Whitebox;
public class TestByteRangeInputStream {
@ -140,8 +142,9 @@ public class TestByteRangeInputStream {
public void testPropagatedClose() throws IOException {
ByteRangeInputStream bris =
mock(ByteRangeInputStream.class, CALLS_REAL_METHODS);
InputStream mockStream = mock(InputStream.class);
doReturn(mockStream).when(bris).openInputStream();
InputStreamAndFileLength mockStream = new InputStreamAndFileLength(1L,
mock(InputStream.class));
doReturn(mockStream).when(bris).openInputStream(Mockito.anyLong());
Whitebox.setInternalState(bris, "status",
ByteRangeInputStream.StreamStatus.SEEK);
@ -151,46 +154,46 @@ public class TestByteRangeInputStream {
// first open, shouldn't close underlying stream
bris.getInputStream();
verify(bris, times(++brisOpens)).openInputStream();
verify(bris, times(++brisOpens)).openInputStream(Mockito.anyLong());
verify(bris, times(brisCloses)).close();
verify(mockStream, times(isCloses)).close();
verify(mockStream.in, times(isCloses)).close();
// stream is open, shouldn't close underlying stream
bris.getInputStream();
verify(bris, times(brisOpens)).openInputStream();
verify(bris, times(brisOpens)).openInputStream(Mockito.anyLong());
verify(bris, times(brisCloses)).close();
verify(mockStream, times(isCloses)).close();
verify(mockStream.in, times(isCloses)).close();
// seek forces a reopen, should close underlying stream
bris.seek(1);
bris.getInputStream();
verify(bris, times(++brisOpens)).openInputStream();
verify(bris, times(++brisOpens)).openInputStream(Mockito.anyLong());
verify(bris, times(brisCloses)).close();
verify(mockStream, times(++isCloses)).close();
verify(mockStream.in, times(++isCloses)).close();
// verify that the underlying stream isn't closed after a seek
// ie. the state was correctly updated
bris.getInputStream();
verify(bris, times(brisOpens)).openInputStream();
verify(bris, times(brisOpens)).openInputStream(Mockito.anyLong());
verify(bris, times(brisCloses)).close();
verify(mockStream, times(isCloses)).close();
verify(mockStream.in, times(isCloses)).close();
// seeking to same location should be a no-op
bris.seek(1);
bris.getInputStream();
verify(bris, times(brisOpens)).openInputStream();
verify(bris, times(brisOpens)).openInputStream(Mockito.anyLong());
verify(bris, times(brisCloses)).close();
verify(mockStream, times(isCloses)).close();
verify(mockStream.in, times(isCloses)).close();
// close should of course close
bris.close();
verify(bris, times(++brisCloses)).close();
verify(mockStream, times(++isCloses)).close();
verify(mockStream.in, times(++isCloses)).close();
// it's already closed, underlying stream should not close
bris.close();
verify(bris, times(++brisCloses)).close();
verify(mockStream, times(isCloses)).close();
verify(mockStream.in, times(isCloses)).close();
// it's closed, don't reopen it
boolean errored = false;
@ -202,9 +205,9 @@ public class TestByteRangeInputStream {
} finally {
assertTrue("Read a closed steam", errored);
}
verify(bris, times(brisOpens)).openInputStream();
verify(bris, times(brisOpens)).openInputStream(Mockito.anyLong());
verify(bris, times(brisCloses)).close();
verify(mockStream, times(isCloses)).close();
verify(mockStream.in, times(isCloses)).close();
}
}

View File

@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.fail;
import java.io.EOFException;
import java.io.IOException;
import java.io.OutputStream;
import java.net.HttpURLConnection;
@ -45,6 +46,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.TestDFSClientRetries;
@ -561,6 +563,45 @@ public class TestWebHDFS {
}
}
@Test
public void testWebHdfsPread() throws Exception {
final Configuration conf = WebHdfsTestUtil.createConf();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1)
.build();
byte[] content = new byte[1024];
RANDOM.nextBytes(content);
final Path foo = new Path("/foo");
FSDataInputStream in = null;
try {
final WebHdfsFileSystem fs = WebHdfsTestUtil.getWebHdfsFileSystem(conf,
WebHdfsConstants.WEBHDFS_SCHEME);
try (OutputStream os = fs.create(foo)) {
os.write(content);
}
// pread
in = fs.open(foo, 1024);
byte[] buf = new byte[1024];
try {
in.readFully(1020, buf, 0, 5);
Assert.fail("EOF expected");
} catch (EOFException ignored) {}
// mix pread with stateful read
int length = in.read(buf, 0, 512);
in.readFully(100, new byte[1024], 0, 100);
int preadLen = in.read(200, new byte[1024], 0, 200);
Assert.assertTrue(preadLen > 0);
IOUtils.readFully(in, buf, length, 1024 - length);
Assert.assertArrayEquals(content, buf);
} finally {
if (in != null) {
in.close();
}
cluster.shutdown();
}
}
@Test(timeout = 30000)
public void testGetHomeDirectory() throws Exception {