diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index db73506cb66..c9e103b677a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -1369,6 +1369,10 @@ Release 0.23.3 - UNRELEASED HDFS-3331. In namenode, check superuser privilege for setBalancerBandwidth and acquire the write lock for finalizeUpgrade. (szetszwo) + HDFS-3577. In DatanodeWebHdfsMethods, use MessageBodyWriter instead of + StreamingOutput, otherwise, it will fail to transfer large files. + (szetszwo) + Release 0.23.2 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java index eb4afe75f85..d8af04c745a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hdfs.server.datanode.web.resources; import java.io.IOException; import java.io.InputStream; -import java.io.OutputStream; import java.net.InetSocketAddress; import java.net.URI; import java.net.URISyntaxException; @@ -40,7 +39,6 @@ import javax.ws.rs.QueryParam; import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; -import javax.ws.rs.core.StreamingOutput; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -411,31 +409,10 @@ public class DatanodeWebHdfsMethods { IOUtils.cleanup(LOG, dfsclient); throw ioe; } - final HdfsDataInputStream dis = in; - final StreamingOutput streaming = new StreamingOutput() { - @Override - public void write(final OutputStream out) throws IOException { - final Long n = length.getValue(); - HdfsDataInputStream dfsin = dis; - DFSClient client = dfsclient; - try { - if (n == null) { - IOUtils.copyBytes(dfsin, out, b); - } else { - IOUtils.copyBytes(dfsin, out, n, false); - } - dfsin.close(); - dfsin = null; - dfsclient.close(); - client = null; - } finally { - IOUtils.cleanup(LOG, dfsin); - IOUtils.cleanup(LOG, client); - } - } - }; - - return Response.ok(streaming).type( + + final long n = length.getValue() != null? length.getValue() + : in.getVisibleLength(); + return Response.ok(new OpenEntity(in, n, dfsclient)).type( MediaType.APPLICATION_OCTET_STREAM).build(); } case GETFILECHECKSUM: diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/OpenEntity.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/OpenEntity.java new file mode 100644 index 00000000000..98a53f8e14f --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/OpenEntity.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode.web.resources; + +import java.io.IOException; +import java.io.OutputStream; +import java.lang.annotation.Annotation; +import java.lang.reflect.Type; + +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.MultivaluedMap; +import javax.ws.rs.ext.MessageBodyWriter; +import javax.ws.rs.ext.Provider; + +import org.apache.hadoop.hdfs.DFSClient; +import org.apache.hadoop.hdfs.client.HdfsDataInputStream; +import org.apache.hadoop.io.IOUtils; + +/** + * A response entity for a HdfsDataInputStream. + */ +public class OpenEntity { + private final HdfsDataInputStream in; + private final long length; + private final DFSClient dfsclient; + + OpenEntity(final HdfsDataInputStream in, final long length, + final DFSClient dfsclient) { + this.in = in; + this.length = length; + this.dfsclient = dfsclient; + } + + /** + * A {@link MessageBodyWriter} for {@link OpenEntity}. + */ + @Provider + public static class Writer implements MessageBodyWriter { + + @Override + public boolean isWriteable(Class clazz, Type genericType, + Annotation[] annotations, MediaType mediaType) { + return clazz == OpenEntity.class + && MediaType.APPLICATION_OCTET_STREAM_TYPE.isCompatible(mediaType); + } + + @Override + public long getSize(OpenEntity e, Class type, Type genericType, + Annotation[] annotations, MediaType mediaType) { + return e.length; + } + + @Override + public void writeTo(OpenEntity e, Class type, Type genericType, + Annotation[] annotations, MediaType mediaType, + MultivaluedMap httpHeaders, OutputStream out + ) throws IOException { + try { + IOUtils.copyBytes(e.in, out, e.length, false); + } finally { + IOUtils.cleanup(DatanodeWebHdfsMethods.LOG, e.in); + IOUtils.cleanup(DatanodeWebHdfsMethods.LOG, e.dfsclient); + } + } + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsFileSystemContract.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsFileSystemContract.java index 2f65c1f0de4..bb1aea0b6f0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsFileSystemContract.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsFileSystemContract.java @@ -26,6 +26,7 @@ import java.net.HttpURLConnection; import java.net.URL; import java.util.Arrays; import java.util.Map; +import java.util.Random; import javax.servlet.http.HttpServletResponse; import javax.ws.rs.core.MediaType; @@ -205,15 +206,20 @@ public class TestWebHdfsFileSystemContract extends FileSystemContractBaseTest { assertEquals(0, count); } - final Path p = new Path(dir, "file"); - createFile(p); + final byte[] mydata = new byte[1 << 20]; + new Random().nextBytes(mydata); - final int one_third = data.length/3; + final Path p = new Path(dir, "file"); + FSDataOutputStream out = fs.create(p, false, 4096, (short)3, 1L << 17); + out.write(mydata, 0, mydata.length); + out.close(); + + final int one_third = mydata.length/3; final int two_third = one_third*2; { //test seek final int offset = one_third; - final int len = data.length - offset; + final int len = mydata.length - offset; final byte[] buf = new byte[len]; final FSDataInputStream in = fs.open(p); @@ -225,13 +231,13 @@ public class TestWebHdfsFileSystemContract extends FileSystemContractBaseTest { for (int i = 0; i < buf.length; i++) { assertEquals("Position " + i + ", offset=" + offset + ", length=" + len, - data[i + offset], buf[i]); + mydata[i + offset], buf[i]); } } { //test position read (read the data after the two_third location) final int offset = two_third; - final int len = data.length - offset; + final int len = mydata.length - offset; final byte[] buf = new byte[len]; final FSDataInputStream in = fs.open(p); @@ -240,7 +246,7 @@ public class TestWebHdfsFileSystemContract extends FileSystemContractBaseTest { for (int i = 0; i < buf.length; i++) { assertEquals("Position " + i + ", offset=" + offset + ", length=" + len, - data[i + offset], buf[i]); + mydata[i + offset], buf[i]); } } }