diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index e016ea2ea67..e797c4b8be4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -1283,6 +1283,9 @@ Release 0.23.3 - UNRELEASED HDFS-3646. LeaseRenewer can hold reference to inactive DFSClient instances forever. (Kihwal Lee via daryn) + HDFS-3696. Set chunked streaming mode in WebHdfsFileSystem write operations + to get around a Java library bug causing OutOfMemoryError. (szetszwo) + Release 0.23.2 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java index 4327dd18c5b..420d74d5829 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java @@ -423,6 +423,7 @@ public class WebHdfsFileSystem extends FileSystem //Step 2) Submit another Http request with the URL from the Location header with data. conn = (HttpURLConnection)new URL(redirect).openConnection(); conn.setRequestMethod(op.getType().toString()); + conn.setChunkedStreamingMode(32 << 10); //32kB-chunk return conn; } @@ -835,8 +836,7 @@ public class WebHdfsFileSystem extends FileSystem } private static WebHdfsFileSystem getWebHdfs( - final Token token, final Configuration conf - ) throws IOException, InterruptedException, URISyntaxException { + final Token token, final Configuration conf) throws IOException { final InetSocketAddress nnAddr = SecurityUtil.getTokenServiceAddr(token); final URI uri = DFSUtil.createUri(WebHdfsFileSystem.SCHEME, nnAddr); @@ -850,12 +850,7 @@ public class WebHdfsFileSystem extends FileSystem // update the kerberos credentials, if they are coming from a keytab ugi.reloginFromKeytab(); - try { - WebHdfsFileSystem webhdfs = getWebHdfs(token, conf); - return webhdfs.renewDelegationToken(token); - } catch (URISyntaxException e) { - throw new IOException(e); - } + return getWebHdfs(token, conf).renewDelegationToken(token); } @Override @@ -865,12 +860,7 @@ public class WebHdfsFileSystem extends FileSystem // update the kerberos credentials, if they are coming from a keytab ugi.checkTGTAndReloginFromKeytab(); - try { - final WebHdfsFileSystem webhdfs = getWebHdfs(token, conf); - webhdfs.cancelDelegationToken(token); - } catch (URISyntaxException e) { - throw new IOException(e); - } + getWebHdfs(token, conf).cancelDelegationToken(token); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHDFS.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHDFS.java new file mode 100644 index 00000000000..66f65cc2792 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHDFS.java @@ -0,0 +1,199 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.web; + +import java.io.IOException; +import java.util.Random; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.junit.Assert; +import org.junit.Test; + +/** Test WebHDFS */ +public class TestWebHDFS { + static final Log LOG = LogFactory.getLog(TestWebHDFS.class); + + static final Random RANDOM = new Random(); + + static final long systemStartTime = System.nanoTime(); + + /** A timer for measuring performance. */ + static class Ticker { + final String name; + final long startTime = System.nanoTime(); + private long previousTick = startTime; + + Ticker(final String name, String format, Object... args) { + this.name = name; + LOG.info(String.format("\n\n%s START: %s\n", + name, String.format(format, args))); + } + + void tick(final long nBytes, String format, Object... args) { + final long now = System.nanoTime(); + if (now - previousTick > 10000000000L) { + previousTick = now; + final double mintues = (now - systemStartTime)/60000000000.0; + LOG.info(String.format("\n\n%s %.2f min) %s %s\n", name, mintues, + String.format(format, args), toMpsString(nBytes, now))); + } + } + + void end(final long nBytes) { + final long now = System.nanoTime(); + final double seconds = (now - startTime)/1000000000.0; + LOG.info(String.format("\n\n%s END: duration=%.2fs %s\n", + name, seconds, toMpsString(nBytes, now))); + } + + String toMpsString(final long nBytes, final long now) { + final double mb = nBytes/(double)(1<<20); + final double mps = mb*1000000000.0/(now - startTime); + return String.format("[nBytes=%.2fMB, speed=%.2fMB/s]", mb, mps); + } + } + + @Test + public void testLargeFile() throws Exception { + largeFileTest(200L << 20); //200MB file length + } + + /** Test read and write large files. */ + static void largeFileTest(final long fileLength) throws Exception { + final Configuration conf = WebHdfsTestUtil.createConf(); + + final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(3) + .build(); + try { + cluster.waitActive(); + + final FileSystem fs = WebHdfsTestUtil.getWebHdfsFileSystem(conf); + final Path dir = new Path("/test/largeFile"); + Assert.assertTrue(fs.mkdirs(dir)); + + final byte[] data = new byte[1 << 20]; + RANDOM.nextBytes(data); + + final byte[] expected = new byte[2 * data.length]; + System.arraycopy(data, 0, expected, 0, data.length); + System.arraycopy(data, 0, expected, data.length, data.length); + + final Path p = new Path(dir, "file"); + final Ticker t = new Ticker("WRITE", "fileLength=" + fileLength); + final FSDataOutputStream out = fs.create(p); + try { + long remaining = fileLength; + for(; remaining > 0;) { + t.tick(fileLength - remaining, "remaining=%d", remaining); + + final int n = (int)Math.min(remaining, data.length); + out.write(data, 0, n); + remaining -= n; + } + } finally { + out.close(); + } + t.end(fileLength); + + Assert.assertEquals(fileLength, fs.getFileStatus(p).getLen()); + + final long smallOffset = RANDOM.nextInt(1 << 20) + (1 << 20); + final long largeOffset = fileLength - smallOffset; + final byte[] buf = new byte[data.length]; + + verifySeek(fs, p, largeOffset, fileLength, buf, expected); + verifySeek(fs, p, smallOffset, fileLength, buf, expected); + + verifyPread(fs, p, largeOffset, fileLength, buf, expected); + } finally { + cluster.shutdown(); + } + } + + static void checkData(long offset, long remaining, int n, + byte[] actual, byte[] expected) { + if (RANDOM.nextInt(100) == 0) { + int j = (int)(offset % actual.length); + for(int i = 0; i < n; i++) { + if (expected[j] != actual[i]) { + Assert.fail("expected[" + j + "]=" + expected[j] + + " != actual[" + i + "]=" + actual[i] + + ", offset=" + offset + ", remaining=" + remaining + ", n=" + n); + } + j++; + } + } + } + + /** test seek */ + static void verifySeek(FileSystem fs, Path p, long offset, long length, + byte[] buf, byte[] expected) throws IOException { + long remaining = length - offset; + long checked = 0; + LOG.info("XXX SEEK: offset=" + offset + ", remaining=" + remaining); + + final Ticker t = new Ticker("SEEK", "offset=%d, remaining=%d", + offset, remaining); + final FSDataInputStream in = fs.open(p, 64 << 10); + in.seek(offset); + for(; remaining > 0; ) { + t.tick(checked, "offset=%d, remaining=%d", offset, remaining); + final int n = (int)Math.min(remaining, buf.length); + in.readFully(buf, 0, n); + checkData(offset, remaining, n, buf, expected); + + offset += n; + remaining -= n; + checked += n; + } + in.close(); + t.end(checked); + } + + static void verifyPread(FileSystem fs, Path p, long offset, long length, + byte[] buf, byte[] expected) throws IOException { + long remaining = length - offset; + long checked = 0; + LOG.info("XXX PREAD: offset=" + offset + ", remaining=" + remaining); + + final Ticker t = new Ticker("PREAD", "offset=%d, remaining=%d", + offset, remaining); + final FSDataInputStream in = fs.open(p, 64 << 10); + for(; remaining > 0; ) { + t.tick(checked, "offset=%d, remaining=%d", offset, remaining); + final int n = (int)Math.min(remaining, buf.length); + in.readFully(offset, buf, 0, n); + checkData(offset, remaining, n, buf, expected); + + offset += n; + remaining -= n; + checked += n; + } + in.close(); + t.end(checked); + } +}