HDFS-3696. Set chunked streaming mode in WebHdfsFileSystem write operations to get around a Java library bug causing OutOfMemoryError.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1365839 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
06b394dcf9
commit
8f395c2f78
|
@ -1403,6 +1403,9 @@ Release 0.23.3 - UNRELEASED
|
||||||
HDFS-3646. LeaseRenewer can hold reference to inactive DFSClient
|
HDFS-3646. LeaseRenewer can hold reference to inactive DFSClient
|
||||||
instances forever. (Kihwal Lee via daryn)
|
instances forever. (Kihwal Lee via daryn)
|
||||||
|
|
||||||
|
HDFS-3696. Set chunked streaming mode in WebHdfsFileSystem write operations
|
||||||
|
to get around a Java library bug causing OutOfMemoryError. (szetszwo)
|
||||||
|
|
||||||
Release 0.23.2 - UNRELEASED
|
Release 0.23.2 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -423,6 +423,7 @@ public class WebHdfsFileSystem extends FileSystem
|
||||||
//Step 2) Submit another Http request with the URL from the Location header with data.
|
//Step 2) Submit another Http request with the URL from the Location header with data.
|
||||||
conn = (HttpURLConnection)new URL(redirect).openConnection();
|
conn = (HttpURLConnection)new URL(redirect).openConnection();
|
||||||
conn.setRequestMethod(op.getType().toString());
|
conn.setRequestMethod(op.getType().toString());
|
||||||
|
conn.setChunkedStreamingMode(32 << 10); //32kB-chunk
|
||||||
return conn;
|
return conn;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -835,8 +836,7 @@ public class WebHdfsFileSystem extends FileSystem
|
||||||
}
|
}
|
||||||
|
|
||||||
private static WebHdfsFileSystem getWebHdfs(
|
private static WebHdfsFileSystem getWebHdfs(
|
||||||
final Token<?> token, final Configuration conf
|
final Token<?> token, final Configuration conf) throws IOException {
|
||||||
) throws IOException, InterruptedException, URISyntaxException {
|
|
||||||
|
|
||||||
final InetSocketAddress nnAddr = SecurityUtil.getTokenServiceAddr(token);
|
final InetSocketAddress nnAddr = SecurityUtil.getTokenServiceAddr(token);
|
||||||
final URI uri = DFSUtil.createUri(WebHdfsFileSystem.SCHEME, nnAddr);
|
final URI uri = DFSUtil.createUri(WebHdfsFileSystem.SCHEME, nnAddr);
|
||||||
|
@ -850,12 +850,7 @@ public class WebHdfsFileSystem extends FileSystem
|
||||||
// update the kerberos credentials, if they are coming from a keytab
|
// update the kerberos credentials, if they are coming from a keytab
|
||||||
ugi.reloginFromKeytab();
|
ugi.reloginFromKeytab();
|
||||||
|
|
||||||
try {
|
return getWebHdfs(token, conf).renewDelegationToken(token);
|
||||||
WebHdfsFileSystem webhdfs = getWebHdfs(token, conf);
|
|
||||||
return webhdfs.renewDelegationToken(token);
|
|
||||||
} catch (URISyntaxException e) {
|
|
||||||
throw new IOException(e);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -865,12 +860,7 @@ public class WebHdfsFileSystem extends FileSystem
|
||||||
// update the kerberos credentials, if they are coming from a keytab
|
// update the kerberos credentials, if they are coming from a keytab
|
||||||
ugi.checkTGTAndReloginFromKeytab();
|
ugi.checkTGTAndReloginFromKeytab();
|
||||||
|
|
||||||
try {
|
getWebHdfs(token, conf).cancelDelegationToken(token);
|
||||||
final WebHdfsFileSystem webhdfs = getWebHdfs(token, conf);
|
|
||||||
webhdfs.cancelDelegationToken(token);
|
|
||||||
} catch (URISyntaxException e) {
|
|
||||||
throw new IOException(e);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,199 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hdfs.web;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Random;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
/** Test WebHDFS */
|
||||||
|
public class TestWebHDFS {
|
||||||
|
static final Log LOG = LogFactory.getLog(TestWebHDFS.class);
|
||||||
|
|
||||||
|
static final Random RANDOM = new Random();
|
||||||
|
|
||||||
|
static final long systemStartTime = System.nanoTime();
|
||||||
|
|
||||||
|
/** A timer for measuring performance. */
|
||||||
|
static class Ticker {
|
||||||
|
final String name;
|
||||||
|
final long startTime = System.nanoTime();
|
||||||
|
private long previousTick = startTime;
|
||||||
|
|
||||||
|
Ticker(final String name, String format, Object... args) {
|
||||||
|
this.name = name;
|
||||||
|
LOG.info(String.format("\n\n%s START: %s\n",
|
||||||
|
name, String.format(format, args)));
|
||||||
|
}
|
||||||
|
|
||||||
|
void tick(final long nBytes, String format, Object... args) {
|
||||||
|
final long now = System.nanoTime();
|
||||||
|
if (now - previousTick > 10000000000L) {
|
||||||
|
previousTick = now;
|
||||||
|
final double mintues = (now - systemStartTime)/60000000000.0;
|
||||||
|
LOG.info(String.format("\n\n%s %.2f min) %s %s\n", name, mintues,
|
||||||
|
String.format(format, args), toMpsString(nBytes, now)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void end(final long nBytes) {
|
||||||
|
final long now = System.nanoTime();
|
||||||
|
final double seconds = (now - startTime)/1000000000.0;
|
||||||
|
LOG.info(String.format("\n\n%s END: duration=%.2fs %s\n",
|
||||||
|
name, seconds, toMpsString(nBytes, now)));
|
||||||
|
}
|
||||||
|
|
||||||
|
String toMpsString(final long nBytes, final long now) {
|
||||||
|
final double mb = nBytes/(double)(1<<20);
|
||||||
|
final double mps = mb*1000000000.0/(now - startTime);
|
||||||
|
return String.format("[nBytes=%.2fMB, speed=%.2fMB/s]", mb, mps);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testLargeFile() throws Exception {
|
||||||
|
largeFileTest(200L << 20); //200MB file length
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Test read and write large files. */
|
||||||
|
static void largeFileTest(final long fileLength) throws Exception {
|
||||||
|
final Configuration conf = WebHdfsTestUtil.createConf();
|
||||||
|
|
||||||
|
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
|
||||||
|
.numDataNodes(3)
|
||||||
|
.build();
|
||||||
|
try {
|
||||||
|
cluster.waitActive();
|
||||||
|
|
||||||
|
final FileSystem fs = WebHdfsTestUtil.getWebHdfsFileSystem(conf);
|
||||||
|
final Path dir = new Path("/test/largeFile");
|
||||||
|
Assert.assertTrue(fs.mkdirs(dir));
|
||||||
|
|
||||||
|
final byte[] data = new byte[1 << 20];
|
||||||
|
RANDOM.nextBytes(data);
|
||||||
|
|
||||||
|
final byte[] expected = new byte[2 * data.length];
|
||||||
|
System.arraycopy(data, 0, expected, 0, data.length);
|
||||||
|
System.arraycopy(data, 0, expected, data.length, data.length);
|
||||||
|
|
||||||
|
final Path p = new Path(dir, "file");
|
||||||
|
final Ticker t = new Ticker("WRITE", "fileLength=" + fileLength);
|
||||||
|
final FSDataOutputStream out = fs.create(p);
|
||||||
|
try {
|
||||||
|
long remaining = fileLength;
|
||||||
|
for(; remaining > 0;) {
|
||||||
|
t.tick(fileLength - remaining, "remaining=%d", remaining);
|
||||||
|
|
||||||
|
final int n = (int)Math.min(remaining, data.length);
|
||||||
|
out.write(data, 0, n);
|
||||||
|
remaining -= n;
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
out.close();
|
||||||
|
}
|
||||||
|
t.end(fileLength);
|
||||||
|
|
||||||
|
Assert.assertEquals(fileLength, fs.getFileStatus(p).getLen());
|
||||||
|
|
||||||
|
final long smallOffset = RANDOM.nextInt(1 << 20) + (1 << 20);
|
||||||
|
final long largeOffset = fileLength - smallOffset;
|
||||||
|
final byte[] buf = new byte[data.length];
|
||||||
|
|
||||||
|
verifySeek(fs, p, largeOffset, fileLength, buf, expected);
|
||||||
|
verifySeek(fs, p, smallOffset, fileLength, buf, expected);
|
||||||
|
|
||||||
|
verifyPread(fs, p, largeOffset, fileLength, buf, expected);
|
||||||
|
} finally {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static void checkData(long offset, long remaining, int n,
|
||||||
|
byte[] actual, byte[] expected) {
|
||||||
|
if (RANDOM.nextInt(100) == 0) {
|
||||||
|
int j = (int)(offset % actual.length);
|
||||||
|
for(int i = 0; i < n; i++) {
|
||||||
|
if (expected[j] != actual[i]) {
|
||||||
|
Assert.fail("expected[" + j + "]=" + expected[j]
|
||||||
|
+ " != actual[" + i + "]=" + actual[i]
|
||||||
|
+ ", offset=" + offset + ", remaining=" + remaining + ", n=" + n);
|
||||||
|
}
|
||||||
|
j++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/** test seek */
|
||||||
|
static void verifySeek(FileSystem fs, Path p, long offset, long length,
|
||||||
|
byte[] buf, byte[] expected) throws IOException {
|
||||||
|
long remaining = length - offset;
|
||||||
|
long checked = 0;
|
||||||
|
LOG.info("XXX SEEK: offset=" + offset + ", remaining=" + remaining);
|
||||||
|
|
||||||
|
final Ticker t = new Ticker("SEEK", "offset=%d, remaining=%d",
|
||||||
|
offset, remaining);
|
||||||
|
final FSDataInputStream in = fs.open(p, 64 << 10);
|
||||||
|
in.seek(offset);
|
||||||
|
for(; remaining > 0; ) {
|
||||||
|
t.tick(checked, "offset=%d, remaining=%d", offset, remaining);
|
||||||
|
final int n = (int)Math.min(remaining, buf.length);
|
||||||
|
in.readFully(buf, 0, n);
|
||||||
|
checkData(offset, remaining, n, buf, expected);
|
||||||
|
|
||||||
|
offset += n;
|
||||||
|
remaining -= n;
|
||||||
|
checked += n;
|
||||||
|
}
|
||||||
|
in.close();
|
||||||
|
t.end(checked);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void verifyPread(FileSystem fs, Path p, long offset, long length,
|
||||||
|
byte[] buf, byte[] expected) throws IOException {
|
||||||
|
long remaining = length - offset;
|
||||||
|
long checked = 0;
|
||||||
|
LOG.info("XXX PREAD: offset=" + offset + ", remaining=" + remaining);
|
||||||
|
|
||||||
|
final Ticker t = new Ticker("PREAD", "offset=%d, remaining=%d",
|
||||||
|
offset, remaining);
|
||||||
|
final FSDataInputStream in = fs.open(p, 64 << 10);
|
||||||
|
for(; remaining > 0; ) {
|
||||||
|
t.tick(checked, "offset=%d, remaining=%d", offset, remaining);
|
||||||
|
final int n = (int)Math.min(remaining, buf.length);
|
||||||
|
in.readFully(offset, buf, 0, n);
|
||||||
|
checkData(offset, remaining, n, buf, expected);
|
||||||
|
|
||||||
|
offset += n;
|
||||||
|
remaining -= n;
|
||||||
|
checked += n;
|
||||||
|
}
|
||||||
|
in.close();
|
||||||
|
t.end(checked);
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue