HDFS-7945. The WebHdfs system on DN does not honor the length parameter. Contributed by Haohui Mai.
This commit is contained in:
parent
ca629f4579
commit
8eb97ea14e
|
@ -891,6 +891,9 @@ Release 2.7.0 - UNRELEASED
|
|||
default dfs.journalnode.http-address port 8480 is in use. (Xiaoyu Yao via
|
||||
Arpit Agarwal)
|
||||
|
||||
HDFS-7945. The WebHdfs system on DN does not honor the length parameter.
|
||||
(wheat9)
|
||||
|
||||
BREAKDOWN OF HDFS-7584 SUBTASKS AND RELATED JIRAS
|
||||
|
||||
HDFS-7720. Quota by Storage Type API, tools and ClientNameNode
|
||||
|
|
|
@ -27,6 +27,7 @@ import org.apache.hadoop.hdfs.web.resources.BufferSizeParam;
|
|||
import org.apache.hadoop.hdfs.web.resources.DelegationParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.DoAsParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.HttpOpParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.LengthParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.NamenodeAddressParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.OffsetParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.OverwriteParam;
|
||||
|
@ -65,6 +66,10 @@ class ParameterParser {
|
|||
return new OffsetParam(param(OffsetParam.NAME)).getOffset();
|
||||
}
|
||||
|
||||
long length() {
|
||||
return new LengthParam(param(LengthParam.NAME)).getLength();
|
||||
}
|
||||
|
||||
String namenodeId() {
|
||||
return new NamenodeAddressParam(param(NamenodeAddressParam.NAME))
|
||||
.getValue();
|
||||
|
|
|
@ -47,8 +47,10 @@ import org.apache.hadoop.hdfs.web.resources.PutOpParam;
|
|||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.util.LimitInputStream;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
|
@ -188,6 +190,7 @@ public class WebHdfsHandler extends SimpleChannelInboundHandler<HttpRequest> {
|
|||
final String nnId = params.namenodeId();
|
||||
final int bufferSize = params.bufferSize();
|
||||
final long offset = params.offset();
|
||||
final long length = params.length();
|
||||
|
||||
DefaultHttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
|
||||
HttpHeaders headers = response.headers();
|
||||
|
@ -202,12 +205,20 @@ public class WebHdfsHandler extends SimpleChannelInboundHandler<HttpRequest> {
|
|||
dfsclient.open(path, bufferSize, true));
|
||||
in.seek(offset);
|
||||
|
||||
if (in.getVisibleLength() >= offset) {
|
||||
headers.set(CONTENT_LENGTH, in.getVisibleLength() - offset);
|
||||
long contentLength = in.getVisibleLength() - offset;
|
||||
if (length >= 0) {
|
||||
contentLength = Math.min(contentLength, length);
|
||||
}
|
||||
final InputStream data;
|
||||
if (contentLength >= 0) {
|
||||
headers.set(CONTENT_LENGTH, contentLength);
|
||||
data = new LimitInputStream(in, contentLength);
|
||||
} else {
|
||||
data = in;
|
||||
}
|
||||
|
||||
ctx.write(response);
|
||||
ctx.writeAndFlush(new ChunkedStream(in) {
|
||||
ctx.writeAndFlush(new ChunkedStream(data) {
|
||||
@Override
|
||||
public void close() throws Exception {
|
||||
super.close();
|
||||
|
|
|
@ -46,4 +46,9 @@ public class LengthParam extends LongParam {
|
|||
public String getName() {
|
||||
return NAME;
|
||||
}
|
||||
|
||||
public long getLength() {
|
||||
Long v = getValue();
|
||||
return v == null ? -1 : v;
|
||||
}
|
||||
}
|
|
@ -21,10 +21,15 @@ package org.apache.hadoop.hdfs.web;
|
|||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.net.HttpURLConnection;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.URISyntaxException;
|
||||
import java.net.URL;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.Random;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.commons.logging.impl.Log4JLogger;
|
||||
|
@ -45,6 +50,9 @@ import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
|||
import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotTestHelper;
|
||||
import org.apache.hadoop.hdfs.server.namenode.web.resources.NamenodeWebHdfsMethods;
|
||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
||||
import org.apache.hadoop.hdfs.web.resources.LengthParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.OffsetParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.Param;
|
||||
import org.apache.hadoop.ipc.RetriableException;
|
||||
import org.apache.hadoop.security.AccessControlException;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
|
@ -523,4 +531,41 @@ public class TestWebHDFS {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWebHdfsOffsetAndLength() throws Exception{
|
||||
MiniDFSCluster cluster = null;
|
||||
final Configuration conf = WebHdfsTestUtil.createConf();
|
||||
final int OFFSET = 42;
|
||||
final int LENGTH = 512;
|
||||
final String PATH = "/foo";
|
||||
byte[] CONTENTS = new byte[1024];
|
||||
RANDOM.nextBytes(CONTENTS);
|
||||
try {
|
||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
|
||||
final WebHdfsFileSystem fs =
|
||||
WebHdfsTestUtil.getWebHdfsFileSystem(conf, WebHdfsFileSystem.SCHEME);
|
||||
try (OutputStream os = fs.create(new Path(PATH))) {
|
||||
os.write(CONTENTS);
|
||||
}
|
||||
InetSocketAddress addr = cluster.getNameNode().getHttpAddress();
|
||||
URL url = new URL("http", addr.getHostString(), addr
|
||||
.getPort(), WebHdfsFileSystem.PATH_PREFIX + PATH + "?op=OPEN" +
|
||||
Param.toSortedString("&", new OffsetParam((long) OFFSET),
|
||||
new LengthParam((long) LENGTH))
|
||||
);
|
||||
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
|
||||
conn.setInstanceFollowRedirects(true);
|
||||
Assert.assertEquals(LENGTH, conn.getContentLength());
|
||||
byte[] subContents = new byte[LENGTH];
|
||||
byte[] realContents = new byte[LENGTH];
|
||||
System.arraycopy(CONTENTS, OFFSET, subContents, 0, LENGTH);
|
||||
IOUtils.readFully(conn.getInputStream(), realContents);
|
||||
Assert.assertArrayEquals(subContents, realContents);
|
||||
} finally {
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue