HDFS-3577. In DatanodeWebHdfsMethods, use MessageBodyWriter instead of StreamingOutput, otherwise, it will fail to transfer large files.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1362976 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
d61c5bcb0b
commit
6e56376023
|
@ -1369,6 +1369,10 @@ Release 0.23.3 - UNRELEASED
|
|||
HDFS-3331. In namenode, check superuser privilege for setBalancerBandwidth
|
||||
and acquire the write lock for finalizeUpgrade. (szetszwo)
|
||||
|
||||
HDFS-3577. In DatanodeWebHdfsMethods, use MessageBodyWriter instead of
|
||||
StreamingOutput, otherwise, it will fail to transfer large files.
|
||||
(szetszwo)
|
||||
|
||||
Release 0.23.2 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -19,7 +19,6 @@ package org.apache.hadoop.hdfs.server.datanode.web.resources;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
|
@ -40,7 +39,6 @@ import javax.ws.rs.QueryParam;
|
|||
import javax.ws.rs.core.Context;
|
||||
import javax.ws.rs.core.MediaType;
|
||||
import javax.ws.rs.core.Response;
|
||||
import javax.ws.rs.core.StreamingOutput;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -411,31 +409,10 @@ public class DatanodeWebHdfsMethods {
|
|||
IOUtils.cleanup(LOG, dfsclient);
|
||||
throw ioe;
|
||||
}
|
||||
final HdfsDataInputStream dis = in;
|
||||
final StreamingOutput streaming = new StreamingOutput() {
|
||||
@Override
|
||||
public void write(final OutputStream out) throws IOException {
|
||||
final Long n = length.getValue();
|
||||
HdfsDataInputStream dfsin = dis;
|
||||
DFSClient client = dfsclient;
|
||||
try {
|
||||
if (n == null) {
|
||||
IOUtils.copyBytes(dfsin, out, b);
|
||||
} else {
|
||||
IOUtils.copyBytes(dfsin, out, n, false);
|
||||
}
|
||||
dfsin.close();
|
||||
dfsin = null;
|
||||
dfsclient.close();
|
||||
client = null;
|
||||
} finally {
|
||||
IOUtils.cleanup(LOG, dfsin);
|
||||
IOUtils.cleanup(LOG, client);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
return Response.ok(streaming).type(
|
||||
|
||||
final long n = length.getValue() != null? length.getValue()
|
||||
: in.getVisibleLength();
|
||||
return Response.ok(new OpenEntity(in, n, dfsclient)).type(
|
||||
MediaType.APPLICATION_OCTET_STREAM).build();
|
||||
}
|
||||
case GETFILECHECKSUM:
|
||||
|
|
|
@ -0,0 +1,81 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.datanode.web.resources;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.lang.annotation.Annotation;
|
||||
import java.lang.reflect.Type;
|
||||
|
||||
import javax.ws.rs.core.MediaType;
|
||||
import javax.ws.rs.core.MultivaluedMap;
|
||||
import javax.ws.rs.ext.MessageBodyWriter;
|
||||
import javax.ws.rs.ext.Provider;
|
||||
|
||||
import org.apache.hadoop.hdfs.DFSClient;
|
||||
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
|
||||
/**
|
||||
* A response entity for a HdfsDataInputStream.
|
||||
*/
|
||||
public class OpenEntity {
|
||||
private final HdfsDataInputStream in;
|
||||
private final long length;
|
||||
private final DFSClient dfsclient;
|
||||
|
||||
OpenEntity(final HdfsDataInputStream in, final long length,
|
||||
final DFSClient dfsclient) {
|
||||
this.in = in;
|
||||
this.length = length;
|
||||
this.dfsclient = dfsclient;
|
||||
}
|
||||
|
||||
/**
|
||||
* A {@link MessageBodyWriter} for {@link OpenEntity}.
|
||||
*/
|
||||
@Provider
|
||||
public static class Writer implements MessageBodyWriter<OpenEntity> {
|
||||
|
||||
@Override
|
||||
public boolean isWriteable(Class<?> clazz, Type genericType,
|
||||
Annotation[] annotations, MediaType mediaType) {
|
||||
return clazz == OpenEntity.class
|
||||
&& MediaType.APPLICATION_OCTET_STREAM_TYPE.isCompatible(mediaType);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getSize(OpenEntity e, Class<?> type, Type genericType,
|
||||
Annotation[] annotations, MediaType mediaType) {
|
||||
return e.length;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(OpenEntity e, Class<?> type, Type genericType,
|
||||
Annotation[] annotations, MediaType mediaType,
|
||||
MultivaluedMap<String, Object> httpHeaders, OutputStream out
|
||||
) throws IOException {
|
||||
try {
|
||||
IOUtils.copyBytes(e.in, out, e.length, false);
|
||||
} finally {
|
||||
IOUtils.cleanup(DatanodeWebHdfsMethods.LOG, e.in);
|
||||
IOUtils.cleanup(DatanodeWebHdfsMethods.LOG, e.dfsclient);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -26,6 +26,7 @@ import java.net.HttpURLConnection;
|
|||
import java.net.URL;
|
||||
import java.util.Arrays;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
import javax.ws.rs.core.MediaType;
|
||||
|
@ -205,15 +206,20 @@ public class TestWebHdfsFileSystemContract extends FileSystemContractBaseTest {
|
|||
assertEquals(0, count);
|
||||
}
|
||||
|
||||
final Path p = new Path(dir, "file");
|
||||
createFile(p);
|
||||
final byte[] mydata = new byte[1 << 20];
|
||||
new Random().nextBytes(mydata);
|
||||
|
||||
final int one_third = data.length/3;
|
||||
final Path p = new Path(dir, "file");
|
||||
FSDataOutputStream out = fs.create(p, false, 4096, (short)3, 1L << 17);
|
||||
out.write(mydata, 0, mydata.length);
|
||||
out.close();
|
||||
|
||||
final int one_third = mydata.length/3;
|
||||
final int two_third = one_third*2;
|
||||
|
||||
{ //test seek
|
||||
final int offset = one_third;
|
||||
final int len = data.length - offset;
|
||||
final int len = mydata.length - offset;
|
||||
final byte[] buf = new byte[len];
|
||||
|
||||
final FSDataInputStream in = fs.open(p);
|
||||
|
@ -225,13 +231,13 @@ public class TestWebHdfsFileSystemContract extends FileSystemContractBaseTest {
|
|||
|
||||
for (int i = 0; i < buf.length; i++) {
|
||||
assertEquals("Position " + i + ", offset=" + offset + ", length=" + len,
|
||||
data[i + offset], buf[i]);
|
||||
mydata[i + offset], buf[i]);
|
||||
}
|
||||
}
|
||||
|
||||
{ //test position read (read the data after the two_third location)
|
||||
final int offset = two_third;
|
||||
final int len = data.length - offset;
|
||||
final int len = mydata.length - offset;
|
||||
final byte[] buf = new byte[len];
|
||||
|
||||
final FSDataInputStream in = fs.open(p);
|
||||
|
@ -240,7 +246,7 @@ public class TestWebHdfsFileSystemContract extends FileSystemContractBaseTest {
|
|||
|
||||
for (int i = 0; i < buf.length; i++) {
|
||||
assertEquals("Position " + i + ", offset=" + offset + ", length=" + len,
|
||||
data[i + offset], buf[i]);
|
||||
mydata[i + offset], buf[i]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue