HDFS-3318. Use BoundedInputStream in ByteRangeInputStream, otherwise, it hangs on transfers >2 GB. Contributed by Daryn Sharp
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1330500 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
e84b552713
commit
f86352c2df
|
@ -898,6 +898,9 @@ Release 0.23.3 - UNRELEASED
|
||||||
HDFS-3312. In HftpFileSystem, the namenode URI is non-secure but the
|
HDFS-3312. In HftpFileSystem, the namenode URI is non-secure but the
|
||||||
delegation tokens have to use secure URI. (Daryn Sharp via szetszwo)
|
delegation tokens have to use secure URI. (Daryn Sharp via szetszwo)
|
||||||
|
|
||||||
|
HDFS-3318. Use BoundedInputStream in ByteRangeInputStream, otherwise, it
|
||||||
|
hangs on transfers >2 GB. (Daryn Sharp via szetszwo)
|
||||||
|
|
||||||
Release 0.23.2 - UNRELEASED
|
Release 0.23.2 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -23,6 +23,7 @@ import java.io.InputStream;
|
||||||
import java.net.HttpURLConnection;
|
import java.net.HttpURLConnection;
|
||||||
import java.net.URL;
|
import java.net.URL;
|
||||||
|
|
||||||
|
import org.apache.commons.io.input.BoundedInputStream;
|
||||||
import org.apache.hadoop.fs.FSInputStream;
|
import org.apache.hadoop.fs.FSInputStream;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.StreamFile;
|
import org.apache.hadoop.hdfs.server.namenode.StreamFile;
|
||||||
|
|
||||||
|
@ -106,8 +107,14 @@ public abstract class ByteRangeInputStream extends FSInputStream {
|
||||||
checkResponseCode(connection);
|
checkResponseCode(connection);
|
||||||
|
|
||||||
final String cl = connection.getHeaderField(StreamFile.CONTENT_LENGTH);
|
final String cl = connection.getHeaderField(StreamFile.CONTENT_LENGTH);
|
||||||
filelength = (cl == null) ? -1 : Long.parseLong(cl);
|
if (cl == null) {
|
||||||
in = connection.getInputStream();
|
throw new IOException(StreamFile.CONTENT_LENGTH+" header is missing");
|
||||||
|
}
|
||||||
|
final long streamlength = Long.parseLong(cl);
|
||||||
|
filelength = startPos + streamlength;
|
||||||
|
// Java has a bug with >2GB request streams. It won't bounds check
|
||||||
|
// the reads so the transfer blocks until the server times out
|
||||||
|
in = new BoundedInputStream(connection.getInputStream(), streamlength);
|
||||||
|
|
||||||
resolvedURL.setURL(getResolvedUrl(connection));
|
resolvedURL.setURL(getResolvedUrl(connection));
|
||||||
status = StreamStatus.NORMAL;
|
status = StreamStatus.NORMAL;
|
||||||
|
@ -116,22 +123,28 @@ public abstract class ByteRangeInputStream extends FSInputStream {
|
||||||
return in;
|
return in;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void update(final boolean isEOF, final int n)
|
private int update(final int n) throws IOException {
|
||||||
throws IOException {
|
if (n != -1) {
|
||||||
if (!isEOF) {
|
|
||||||
currentPos += n;
|
currentPos += n;
|
||||||
} else if (currentPos < filelength) {
|
} else if (currentPos < filelength) {
|
||||||
throw new IOException("Got EOF but currentPos = " + currentPos
|
throw new IOException("Got EOF but currentPos = " + currentPos
|
||||||
+ " < filelength = " + filelength);
|
+ " < filelength = " + filelength);
|
||||||
}
|
}
|
||||||
|
return n;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public int read() throws IOException {
|
public int read() throws IOException {
|
||||||
final int b = getInputStream().read();
|
final int b = getInputStream().read();
|
||||||
update(b == -1, 1);
|
update((b == -1) ? -1 : 1);
|
||||||
return b;
|
return b;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int read(byte b[], int off, int len) throws IOException {
|
||||||
|
return update(getInputStream().read(b, off, len));
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Seek to the given offset from the start of the file.
|
* Seek to the given offset from the start of the file.
|
||||||
* The next read() will be from that location. Can't
|
* The next read() will be from that location. Can't
|
||||||
|
|
|
@ -31,6 +31,7 @@ import java.io.InputStream;
|
||||||
import java.net.HttpURLConnection;
|
import java.net.HttpURLConnection;
|
||||||
import java.net.URL;
|
import java.net.URL;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.StreamFile;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
public class TestByteRangeInputStream {
|
public class TestByteRangeInputStream {
|
||||||
|
@ -84,6 +85,11 @@ public static class MockHttpURLConnection extends HttpURLConnection {
|
||||||
public void setResponseCode(int resCode) {
|
public void setResponseCode(int resCode) {
|
||||||
responseCode = resCode;
|
responseCode = resCode;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getHeaderField(String field) {
|
||||||
|
return (field.equalsIgnoreCase(StreamFile.CONTENT_LENGTH)) ? "65535" : null;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
Loading…
Reference in New Issue