HDFS-7694. FSDataInputStream should support "unbuffer" (cmccabe)

(cherry picked from commit 6b39ad0865)
This commit is contained in:
Colin Patrick Mccabe 2015-02-12 10:40:46 -08:00
parent 92f52e8962
commit e35788aa5a
8 changed files with 227 additions and 4 deletions

View File

@ -0,0 +1,36 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* FSDataInputStreams implement this interface to indicate that they can clear
* their buffers on request.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public interface CanUnbuffer {
/**
* Reduce the buffering. This will also free sockets and file descriptors
* held by the stream, if possible.
*/
public void unbuffer();
}

View File

@ -35,7 +35,7 @@
public class FSDataInputStream extends DataInputStream
implements Seekable, PositionedReadable,
ByteBufferReadable, HasFileDescriptor, CanSetDropBehind, CanSetReadahead,
HasEnhancedByteBufferAccess {
HasEnhancedByteBufferAccess, CanUnbuffer {
/**
* Map ByteBuffers that we have handed out to readers to ByteBufferPool
* objects
@ -220,4 +220,14 @@ public void releaseBuffer(ByteBuffer buffer) {
bufferPool.putBuffer(buffer);
}
}
@Override
public void unbuffer() {
try {
((CanUnbuffer)in).unbuffer();
} catch (ClassCastException e) {
throw new UnsupportedOperationException("this stream does not " +
"support unbuffering.");
}
}
}

View File

@ -324,6 +324,8 @@ Release 2.7.0 - UNRELEASED
HDFS-7761. cleanup unnecssary code logic in LocatedBlock. (yliu)
HDFS-7694. FSDataInputStream should support "unbuffer" (cmccabe)
OPTIMIZATIONS
HDFS-7454. Reduce memory footprint for AclEntries in NameNode.

View File

@ -49,6 +49,7 @@
import org.apache.hadoop.fs.ByteBufferUtil;
import org.apache.hadoop.fs.CanSetDropBehind;
import org.apache.hadoop.fs.CanSetReadahead;
import org.apache.hadoop.fs.CanUnbuffer;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.HasEnhancedByteBufferAccess;
@ -86,7 +87,7 @@
@InterfaceAudience.Private
public class DFSInputStream extends FSInputStream
implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
HasEnhancedByteBufferAccess {
HasEnhancedByteBufferAccess, CanUnbuffer {
@VisibleForTesting
public static boolean tcpReadsDisabledForTesting = false;
private long hedgedReadOpsLoopNumForTesting = 0;
@ -1818,4 +1819,9 @@ public synchronized void releaseBuffer(ByteBuffer buffer) {
((ByteBufferPool)val).putBuffer(buffer);
}
}
@Override
public synchronized void unbuffer() {
closeCurrentBlockReader();
}
}

View File

@ -29,16 +29,21 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.Time;
/**
* A cache of input stream sockets to Data Node.
*/
class PeerCache {
@InterfaceStability.Unstable
@InterfaceAudience.Private
@VisibleForTesting
public class PeerCache {
private static final Log LOG = LogFactory.getLog(PeerCache.class);
private static class Key {

View File

@ -1037,6 +1037,34 @@ done:
return file;
}
int hdfsUnbufferFile(hdfsFile file)
{
int ret;
jthrowable jthr;
JNIEnv *env = getJNIEnv();
if (!env) {
ret = EINTERNAL;
goto done;
}
if (file->type != HDFS_STREAM_INPUT) {
ret = ENOTSUP;
goto done;
}
jthr = invokeMethod(env, NULL, INSTANCE, file->file, HADOOP_ISTRM,
"unbuffer", "()V");
if (jthr) {
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
HADOOP_ISTRM "#unbuffer failed:");
goto done;
}
ret = 0;
done:
errno = ret;
return ret;
}
int hdfsCloseFile(hdfsFS fs, hdfsFile file)
{
int ret;

View File

@ -347,6 +347,15 @@ extern "C" {
hdfsFile hdfsOpenFile(hdfsFS fs, const char* path, int flags,
int bufferSize, short replication, tSize blocksize);
/**
* hdfsUnbufferFile - Reduce the buffering done on a file.
*
* @param file The file to unbuffer.
* @return 0 on success
* ENOTSUP if the file does not support unbuffering
* Errno will also be set to this value.
*/
int hdfsUnbufferFile(hdfsFile file);
/**
* hdfsCloseFile - Close an open file.

View File

@ -0,0 +1,127 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.PeerCache;
import org.apache.hadoop.io.IOUtils;
import org.junit.Assert;
import org.junit.Test;
public class TestUnbuffer {
private static final Log LOG =
LogFactory.getLog(TestUnbuffer.class.getName());
/**
* Test that calling Unbuffer closes sockets.
*/
@Test
public void testUnbufferClosesSockets() throws Exception {
Configuration conf = new Configuration();
// Set a new ClientContext. This way, we will have our own PeerCache,
// rather than sharing one with other unit tests.
conf.set(DFSConfigKeys.DFS_CLIENT_CONTEXT,
"testUnbufferClosesSocketsContext");
// Disable short-circuit reads. With short-circuit, we wouldn't hold open a
// TCP socket.
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, false);
// Set a really long socket timeout to avoid test timing issues.
conf.setLong(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY,
100000000L);
conf.setLong(DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY,
100000000L);
MiniDFSCluster cluster = null;
FSDataInputStream stream = null;
try {
cluster = new MiniDFSCluster.Builder(conf).build();
DistributedFileSystem dfs = (DistributedFileSystem)
FileSystem.newInstance(conf);
final Path TEST_PATH = new Path("/test1");
DFSTestUtil.createFile(dfs, TEST_PATH, 128, (short)1, 1);
stream = dfs.open(TEST_PATH);
// Read a byte. This will trigger the creation of a block reader.
stream.seek(2);
int b = stream.read();
Assert.assertTrue(-1 != b);
// The Peer cache should start off empty.
PeerCache cache = dfs.getClient().getClientContext().getPeerCache();
Assert.assertEquals(0, cache.size());
// Unbuffer should clear the block reader and return the socket to the
// cache.
stream.unbuffer();
stream.seek(2);
Assert.assertEquals(1, cache.size());
int b2 = stream.read();
Assert.assertEquals(b, b2);
} finally {
if (stream != null) {
IOUtils.cleanup(null, stream);
}
if (cluster != null) {
cluster.shutdown();
}
}
}
/**
* Test opening many files via TCP (not short-circuit).
*
* This is practical when using unbuffer, because it reduces the number of
* sockets and amount of memory that we use.
*/
@Test
public void testOpenManyFilesViaTcp() throws Exception {
final int NUM_OPENS = 500;
Configuration conf = new Configuration();
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, false);
MiniDFSCluster cluster = null;
FSDataInputStream[] streams = new FSDataInputStream[NUM_OPENS];
try {
cluster = new MiniDFSCluster.Builder(conf).build();
DistributedFileSystem dfs = cluster.getFileSystem();
final Path TEST_PATH = new Path("/testFile");
DFSTestUtil.createFile(dfs, TEST_PATH, 131072, (short)1, 1);
for (int i = 0; i < NUM_OPENS; i++) {
streams[i] = dfs.open(TEST_PATH);
LOG.info("opening file " + i + "...");
Assert.assertTrue(-1 != streams[i].read());
streams[i].unbuffer();
}
} finally {
for (FSDataInputStream stream : streams) {
IOUtils.cleanup(null, stream);
}
if (cluster != null) {
cluster.shutdown();
}
}
}
}