HDFS-8797. WebHdfsFileSystem creates too many connections for pread. Contributed by Jing Zhao.

(cherry picked from commit e91ccfad07)
This commit is contained in:
Jing Zhao 2015-07-22 17:42:31 -07:00
parent 6772c3f4dd
commit 71764a92c6
4 changed files with 113 additions and 22 deletions

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.web; package org.apache.hadoop.hdfs.web;
import java.io.EOFException;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.net.HttpURLConnection; import java.net.HttpURLConnection;
@ -66,6 +67,16 @@ public abstract class ByteRangeInputStream extends FSInputStream {
final boolean resolved) throws IOException; final boolean resolved) throws IOException;
} }
static class InputStreamAndFileLength {
final Long length;
final InputStream in;
InputStreamAndFileLength(Long length, InputStream in) {
this.length = length;
this.in = in;
}
}
enum StreamStatus { enum StreamStatus {
NORMAL, SEEK, CLOSED NORMAL, SEEK, CLOSED
} }
@ -102,7 +113,9 @@ public abstract class ByteRangeInputStream extends FSInputStream {
if (in != null) { if (in != null) {
in.close(); in.close();
} }
in = openInputStream(); InputStreamAndFileLength fin = openInputStream(startPos);
in = fin.in;
fileLength = fin.length;
status = StreamStatus.NORMAL; status = StreamStatus.NORMAL;
break; break;
case CLOSED: case CLOSED:
@ -112,31 +125,33 @@ public abstract class ByteRangeInputStream extends FSInputStream {
} }
@VisibleForTesting @VisibleForTesting
protected InputStream openInputStream() throws IOException { protected InputStreamAndFileLength openInputStream(long startOffset)
throws IOException {
// Use the original url if no resolved url exists, eg. if // Use the original url if no resolved url exists, eg. if
// it's the first time a request is made. // it's the first time a request is made.
final boolean resolved = resolvedURL.getURL() != null; final boolean resolved = resolvedURL.getURL() != null;
final URLOpener opener = resolved? resolvedURL: originalURL; final URLOpener opener = resolved? resolvedURL: originalURL;
final HttpURLConnection connection = opener.connect(startPos, resolved); final HttpURLConnection connection = opener.connect(startOffset, resolved);
resolvedURL.setURL(getResolvedUrl(connection)); resolvedURL.setURL(getResolvedUrl(connection));
InputStream in = connection.getInputStream(); InputStream in = connection.getInputStream();
final Long length;
final Map<String, List<String>> headers = connection.getHeaderFields(); final Map<String, List<String>> headers = connection.getHeaderFields();
if (isChunkedTransferEncoding(headers)) { if (isChunkedTransferEncoding(headers)) {
// file length is not known // file length is not known
fileLength = null; length = null;
} else { } else {
// for non-chunked transfer-encoding, get content-length // for non-chunked transfer-encoding, get content-length
long streamlength = getStreamLength(connection, headers); long streamlength = getStreamLength(connection, headers);
fileLength = startPos + streamlength; length = startOffset + streamlength;
// Java has a bug with >2GB request streams. It won't bounds check // Java has a bug with >2GB request streams. It won't bounds check
// the reads so the transfer blocks until the server times out // the reads so the transfer blocks until the server times out
in = new BoundedInputStream(in, streamlength); in = new BoundedInputStream(in, streamlength);
} }
return in; return new InputStreamAndFileLength(length, in);
} }
private static long getStreamLength(HttpURLConnection connection, private static long getStreamLength(HttpURLConnection connection,
@ -230,6 +245,36 @@ public abstract class ByteRangeInputStream extends FSInputStream {
} }
} }
@Override
public int read(long position, byte[] buffer, int offset, int length)
throws IOException {
try (InputStream in = openInputStream(position).in) {
return in.read(buffer, offset, length);
}
}
@Override
public void readFully(long position, byte[] buffer, int offset, int length)
throws IOException {
final InputStreamAndFileLength fin = openInputStream(position);
if (fin.length != null && length + position > fin.length) {
throw new EOFException("The length to read " + length
+ " exceeds the file length " + fin.length);
}
try {
int nread = 0;
while (nread < length) {
int nbytes = fin.in.read(buffer, offset + nread, length - nread);
if (nbytes < 0) {
throw new EOFException("End of file reached before reading fully.");
}
nread += nbytes;
}
} finally {
fin.in.close();
}
}
/** /**
* Return the current offset from the start of the file * Return the current offset from the start of the file
*/ */

View File

@ -399,6 +399,8 @@ Release 2.8.0 - UNRELEASED
HDFS-8795. Improve InvalidateBlocks#node2blocks. (yliu) HDFS-8795. Improve InvalidateBlocks#node2blocks. (yliu)
HDFS-8797. WebHdfsFileSystem creates too many connections for pread. (jing9)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

View File

@ -35,7 +35,9 @@ import java.net.HttpURLConnection;
import java.net.URL; import java.net.URL;
import com.google.common.net.HttpHeaders; import com.google.common.net.HttpHeaders;
import org.apache.hadoop.hdfs.web.ByteRangeInputStream.InputStreamAndFileLength;
import org.junit.Test; import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.internal.util.reflection.Whitebox; import org.mockito.internal.util.reflection.Whitebox;
public class TestByteRangeInputStream { public class TestByteRangeInputStream {
@ -140,8 +142,9 @@ public class TestByteRangeInputStream {
public void testPropagatedClose() throws IOException { public void testPropagatedClose() throws IOException {
ByteRangeInputStream bris = ByteRangeInputStream bris =
mock(ByteRangeInputStream.class, CALLS_REAL_METHODS); mock(ByteRangeInputStream.class, CALLS_REAL_METHODS);
InputStream mockStream = mock(InputStream.class); InputStreamAndFileLength mockStream = new InputStreamAndFileLength(1L,
doReturn(mockStream).when(bris).openInputStream(); mock(InputStream.class));
doReturn(mockStream).when(bris).openInputStream(Mockito.anyLong());
Whitebox.setInternalState(bris, "status", Whitebox.setInternalState(bris, "status",
ByteRangeInputStream.StreamStatus.SEEK); ByteRangeInputStream.StreamStatus.SEEK);
@ -151,46 +154,46 @@ public class TestByteRangeInputStream {
// first open, shouldn't close underlying stream // first open, shouldn't close underlying stream
bris.getInputStream(); bris.getInputStream();
verify(bris, times(++brisOpens)).openInputStream(); verify(bris, times(++brisOpens)).openInputStream(Mockito.anyLong());
verify(bris, times(brisCloses)).close(); verify(bris, times(brisCloses)).close();
verify(mockStream, times(isCloses)).close(); verify(mockStream.in, times(isCloses)).close();
// stream is open, shouldn't close underlying stream // stream is open, shouldn't close underlying stream
bris.getInputStream(); bris.getInputStream();
verify(bris, times(brisOpens)).openInputStream(); verify(bris, times(brisOpens)).openInputStream(Mockito.anyLong());
verify(bris, times(brisCloses)).close(); verify(bris, times(brisCloses)).close();
verify(mockStream, times(isCloses)).close(); verify(mockStream.in, times(isCloses)).close();
// seek forces a reopen, should close underlying stream // seek forces a reopen, should close underlying stream
bris.seek(1); bris.seek(1);
bris.getInputStream(); bris.getInputStream();
verify(bris, times(++brisOpens)).openInputStream(); verify(bris, times(++brisOpens)).openInputStream(Mockito.anyLong());
verify(bris, times(brisCloses)).close(); verify(bris, times(brisCloses)).close();
verify(mockStream, times(++isCloses)).close(); verify(mockStream.in, times(++isCloses)).close();
// verify that the underlying stream isn't closed after a seek // verify that the underlying stream isn't closed after a seek
// ie. the state was correctly updated // ie. the state was correctly updated
bris.getInputStream(); bris.getInputStream();
verify(bris, times(brisOpens)).openInputStream(); verify(bris, times(brisOpens)).openInputStream(Mockito.anyLong());
verify(bris, times(brisCloses)).close(); verify(bris, times(brisCloses)).close();
verify(mockStream, times(isCloses)).close(); verify(mockStream.in, times(isCloses)).close();
// seeking to same location should be a no-op // seeking to same location should be a no-op
bris.seek(1); bris.seek(1);
bris.getInputStream(); bris.getInputStream();
verify(bris, times(brisOpens)).openInputStream(); verify(bris, times(brisOpens)).openInputStream(Mockito.anyLong());
verify(bris, times(brisCloses)).close(); verify(bris, times(brisCloses)).close();
verify(mockStream, times(isCloses)).close(); verify(mockStream.in, times(isCloses)).close();
// close should of course close // close should of course close
bris.close(); bris.close();
verify(bris, times(++brisCloses)).close(); verify(bris, times(++brisCloses)).close();
verify(mockStream, times(++isCloses)).close(); verify(mockStream.in, times(++isCloses)).close();
// it's already closed, underlying stream should not close // it's already closed, underlying stream should not close
bris.close(); bris.close();
verify(bris, times(++brisCloses)).close(); verify(bris, times(++brisCloses)).close();
verify(mockStream, times(isCloses)).close(); verify(mockStream.in, times(isCloses)).close();
// it's closed, don't reopen it // it's closed, don't reopen it
boolean errored = false; boolean errored = false;
@ -202,9 +205,9 @@ public class TestByteRangeInputStream {
} finally { } finally {
assertTrue("Read a closed steam", errored); assertTrue("Read a closed steam", errored);
} }
verify(bris, times(brisOpens)).openInputStream(); verify(bris, times(brisOpens)).openInputStream(Mockito.anyLong());
verify(bris, times(brisCloses)).close(); verify(bris, times(brisCloses)).close();
verify(mockStream, times(isCloses)).close(); verify(mockStream.in, times(isCloses)).close();
} }
} }

View File

@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import java.io.EOFException;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.net.HttpURLConnection; import java.net.HttpURLConnection;
@ -45,6 +46,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
@ -574,6 +576,45 @@ public class TestWebHDFS {
} }
} }
@Test
public void testWebHdfsPread() throws Exception {
final Configuration conf = WebHdfsTestUtil.createConf();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1)
.build();
byte[] content = new byte[1024];
RANDOM.nextBytes(content);
final Path foo = new Path("/foo");
FSDataInputStream in = null;
try {
final WebHdfsFileSystem fs = WebHdfsTestUtil.getWebHdfsFileSystem(conf,
WebHdfsConstants.WEBHDFS_SCHEME);
try (OutputStream os = fs.create(foo)) {
os.write(content);
}
// pread
in = fs.open(foo, 1024);
byte[] buf = new byte[1024];
try {
in.readFully(1020, buf, 0, 5);
Assert.fail("EOF expected");
} catch (EOFException ignored) {}
// mix pread with stateful read
int length = in.read(buf, 0, 512);
in.readFully(100, new byte[1024], 0, 100);
int preadLen = in.read(200, new byte[1024], 0, 200);
Assert.assertTrue(preadLen > 0);
IOUtils.readFully(in, buf, length, 1024 - length);
Assert.assertArrayEquals(content, buf);
} finally {
if (in != null) {
in.close();
}
cluster.shutdown();
}
}
@Test(timeout = 30000) @Test(timeout = 30000)
public void testGetHomeDirectory() throws Exception { public void testGetHomeDirectory() throws Exception {