svn merge -c 1374122 from trunk for HDFS-3788. ByteRangeInputStream should not expect HTTP Content-Length header when chunked transfer-encoding is used.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1374124 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2012-08-17 01:44:31 +00:00
parent 2fc30022d0
commit c849ef9bea
2 changed files with 55 additions and 17 deletions

View File

@ -467,6 +467,9 @@ Release 2.0.1-alpha - UNRELEASED
HDFS-3808. fuse_dfs: postpone libhdfs intialization until after fork. HDFS-3808. fuse_dfs: postpone libhdfs intialization until after fork.
(Colin Patrick McCabe via atm) (Colin Patrick McCabe via atm)
HDFS-3788. ByteRangeInputStream should not expect HTTP Content-Length header
when chunked transfer-encoding is used. (szetszwo)
BREAKDOWN OF HDFS-3042 SUBTASKS BREAKDOWN OF HDFS-3042 SUBTASKS
HDFS-2185. HDFS portion of ZK-based FailoverController (todd) HDFS-2185. HDFS portion of ZK-based FailoverController (todd)

View File

@ -22,12 +22,15 @@ import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.net.HttpURLConnection; import java.net.HttpURLConnection;
import java.net.URL; import java.net.URL;
import java.util.List;
import java.util.Map;
import java.util.StringTokenizer;
import org.apache.commons.io.input.BoundedInputStream; import org.apache.commons.io.input.BoundedInputStream;
import org.apache.hadoop.fs.FSInputStream; import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.hdfs.server.namenode.StreamFile;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.net.HttpHeaders;
/** /**
* To support HTTP byte streams, a new connection to an HTTP server needs to be * To support HTTP byte streams, a new connection to an HTTP server needs to be
@ -70,7 +73,7 @@ public abstract class ByteRangeInputStream extends FSInputStream {
protected URLOpener resolvedURL; protected URLOpener resolvedURL;
protected long startPos = 0; protected long startPos = 0;
protected long currentPos = 0; protected long currentPos = 0;
protected long filelength; protected Long fileLength = null;
StreamStatus status = StreamStatus.SEEK; StreamStatus status = StreamStatus.SEEK;
@ -114,28 +117,60 @@ public abstract class ByteRangeInputStream extends FSInputStream {
final URLOpener opener = resolved? resolvedURL: originalURL; final URLOpener opener = resolved? resolvedURL: originalURL;
final HttpURLConnection connection = opener.connect(startPos, resolved); final HttpURLConnection connection = opener.connect(startPos, resolved);
final String cl = connection.getHeaderField(StreamFile.CONTENT_LENGTH);
if (cl == null) {
throw new IOException(StreamFile.CONTENT_LENGTH+" header is missing");
}
final long streamlength = Long.parseLong(cl);
filelength = startPos + streamlength;
// Java has a bug with >2GB request streams. It won't bounds check
// the reads so the transfer blocks until the server times out
InputStream is =
new BoundedInputStream(connection.getInputStream(), streamlength);
resolvedURL.setURL(getResolvedUrl(connection)); resolvedURL.setURL(getResolvedUrl(connection));
return is; InputStream in = connection.getInputStream();
final Map<String, List<String>> headers = connection.getHeaderFields();
if (isChunkedTransferEncoding(headers)) {
// file length is not known
fileLength = null;
} else {
// for non-chunked transfer-encoding, get content-length
final String cl = connection.getHeaderField(HttpHeaders.CONTENT_LENGTH);
if (cl == null) {
throw new IOException(HttpHeaders.CONTENT_LENGTH + " is missing: "
+ headers);
}
final long streamlength = Long.parseLong(cl);
fileLength = startPos + streamlength;
// Java has a bug with >2GB request streams. It won't bounds check
// the reads so the transfer blocks until the server times out
in = new BoundedInputStream(in, streamlength);
}
return in;
}
private static boolean isChunkedTransferEncoding(
final Map<String, List<String>> headers) {
return contains(headers, HttpHeaders.TRANSFER_ENCODING, "chunked")
|| contains(headers, HttpHeaders.TE, "chunked");
}
/** Does the HTTP header map contain the given key, value pair? */
private static boolean contains(final Map<String, List<String>> headers,
final String key, final String value) {
final List<String> values = headers.get(key);
if (values != null) {
for(String v : values) {
for(final StringTokenizer t = new StringTokenizer(v, ",");
t.hasMoreTokens(); ) {
if (value.equalsIgnoreCase(t.nextToken())) {
return true;
}
}
}
}
return false;
} }
private int update(final int n) throws IOException { private int update(final int n) throws IOException {
if (n != -1) { if (n != -1) {
currentPos += n; currentPos += n;
} else if (currentPos < filelength) { } else if (fileLength != null && currentPos < fileLength) {
throw new IOException("Got EOF but currentPos = " + currentPos throw new IOException("Got EOF but currentPos = " + currentPos
+ " < filelength = " + filelength); + " < filelength = " + fileLength);
} }
return n; return n;
} }