HDFS-7694. FSDataInputStream should support "unbuffer" (cmccabe)
This commit is contained in:
parent
bc5aa7d0b1
commit
6b39ad0865
|
@ -0,0 +1,36 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
|
||||
/**
|
||||
* FSDataInputStreams implement this interface to indicate that they can clear
|
||||
* their buffers on request.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Evolving
|
||||
public interface CanUnbuffer {
|
||||
/**
|
||||
* Reduce the buffering. This will also free sockets and file descriptors
|
||||
* held by the stream, if possible.
|
||||
*/
|
||||
public void unbuffer();
|
||||
}
|
|
@ -35,7 +35,7 @@ import org.apache.hadoop.util.IdentityHashStore;
|
|||
public class FSDataInputStream extends DataInputStream
|
||||
implements Seekable, PositionedReadable,
|
||||
ByteBufferReadable, HasFileDescriptor, CanSetDropBehind, CanSetReadahead,
|
||||
HasEnhancedByteBufferAccess {
|
||||
HasEnhancedByteBufferAccess, CanUnbuffer {
|
||||
/**
|
||||
* Map ByteBuffers that we have handed out to readers to ByteBufferPool
|
||||
* objects
|
||||
|
@ -220,4 +220,14 @@ public class FSDataInputStream extends DataInputStream
|
|||
bufferPool.putBuffer(buffer);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unbuffer() {
|
||||
try {
|
||||
((CanUnbuffer)in).unbuffer();
|
||||
} catch (ClassCastException e) {
|
||||
throw new UnsupportedOperationException("this stream does not " +
|
||||
"support unbuffering.");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -624,6 +624,8 @@ Release 2.7.0 - UNRELEASED
|
|||
HDFS-7703. Support favouredNodes for the append for new blocks
|
||||
(vinayakumarb)
|
||||
|
||||
HDFS-7694. FSDataInputStream should support "unbuffer" (cmccabe)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
HDFS-7454. Reduce memory footprint for AclEntries in NameNode.
|
||||
|
|
|
@ -49,6 +49,7 @@ import org.apache.hadoop.fs.ByteBufferReadable;
|
|||
import org.apache.hadoop.fs.ByteBufferUtil;
|
||||
import org.apache.hadoop.fs.CanSetDropBehind;
|
||||
import org.apache.hadoop.fs.CanSetReadahead;
|
||||
import org.apache.hadoop.fs.CanUnbuffer;
|
||||
import org.apache.hadoop.fs.ChecksumException;
|
||||
import org.apache.hadoop.fs.FSInputStream;
|
||||
import org.apache.hadoop.fs.HasEnhancedByteBufferAccess;
|
||||
|
@ -86,7 +87,7 @@ import com.google.common.annotations.VisibleForTesting;
|
|||
@InterfaceAudience.Private
|
||||
public class DFSInputStream extends FSInputStream
|
||||
implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
||||
HasEnhancedByteBufferAccess {
|
||||
HasEnhancedByteBufferAccess, CanUnbuffer {
|
||||
@VisibleForTesting
|
||||
public static boolean tcpReadsDisabledForTesting = false;
|
||||
private long hedgedReadOpsLoopNumForTesting = 0;
|
||||
|
@ -1818,4 +1819,9 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
|||
((ByteBufferPool)val).putBuffer(buffer);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void unbuffer() {
|
||||
closeCurrentBlockReader();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -29,16 +29,21 @@ import com.google.common.collect.LinkedListMultimap;
|
|||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.hdfs.net.Peer;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.hdfs.net.Peer;
|
||||
import org.apache.hadoop.util.Daemon;
|
||||
import org.apache.hadoop.util.Time;
|
||||
|
||||
/**
|
||||
* A cache of input stream sockets to Data Node.
|
||||
*/
|
||||
class PeerCache {
|
||||
@InterfaceStability.Unstable
|
||||
@InterfaceAudience.Private
|
||||
@VisibleForTesting
|
||||
public class PeerCache {
|
||||
private static final Log LOG = LogFactory.getLog(PeerCache.class);
|
||||
|
||||
private static class Key {
|
||||
|
|
|
@ -1037,6 +1037,34 @@ done:
|
|||
return file;
|
||||
}
|
||||
|
||||
int hdfsUnbufferFile(hdfsFile file)
|
||||
{
|
||||
int ret;
|
||||
jthrowable jthr;
|
||||
JNIEnv *env = getJNIEnv();
|
||||
|
||||
if (!env) {
|
||||
ret = EINTERNAL;
|
||||
goto done;
|
||||
}
|
||||
if (file->type != HDFS_STREAM_INPUT) {
|
||||
ret = ENOTSUP;
|
||||
goto done;
|
||||
}
|
||||
jthr = invokeMethod(env, NULL, INSTANCE, file->file, HADOOP_ISTRM,
|
||||
"unbuffer", "()V");
|
||||
if (jthr) {
|
||||
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
|
||||
HADOOP_ISTRM "#unbuffer failed:");
|
||||
goto done;
|
||||
}
|
||||
ret = 0;
|
||||
|
||||
done:
|
||||
errno = ret;
|
||||
return ret;
|
||||
}
|
||||
|
||||
int hdfsCloseFile(hdfsFS fs, hdfsFile file)
|
||||
{
|
||||
int ret;
|
||||
|
|
|
@ -347,6 +347,15 @@ extern "C" {
|
|||
hdfsFile hdfsOpenFile(hdfsFS fs, const char* path, int flags,
|
||||
int bufferSize, short replication, tSize blocksize);
|
||||
|
||||
/**
|
||||
* hdfsUnbufferFile - Reduce the buffering done on a file.
|
||||
*
|
||||
* @param file The file to unbuffer.
|
||||
* @return 0 on success
|
||||
* ENOTSUP if the file does not support unbuffering
|
||||
* Errno will also be set to this value.
|
||||
*/
|
||||
int hdfsUnbufferFile(hdfsFile file);
|
||||
|
||||
/**
|
||||
* hdfsCloseFile - Close an open file.
|
||||
|
|
|
@ -0,0 +1,127 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.fs;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.PeerCache;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestUnbuffer {
|
||||
private static final Log LOG =
|
||||
LogFactory.getLog(TestUnbuffer.class.getName());
|
||||
|
||||
/**
|
||||
* Test that calling Unbuffer closes sockets.
|
||||
*/
|
||||
@Test
|
||||
public void testUnbufferClosesSockets() throws Exception {
|
||||
Configuration conf = new Configuration();
|
||||
// Set a new ClientContext. This way, we will have our own PeerCache,
|
||||
// rather than sharing one with other unit tests.
|
||||
conf.set(DFSConfigKeys.DFS_CLIENT_CONTEXT,
|
||||
"testUnbufferClosesSocketsContext");
|
||||
|
||||
// Disable short-circuit reads. With short-circuit, we wouldn't hold open a
|
||||
// TCP socket.
|
||||
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, false);
|
||||
|
||||
// Set a really long socket timeout to avoid test timing issues.
|
||||
conf.setLong(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY,
|
||||
100000000L);
|
||||
conf.setLong(DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY,
|
||||
100000000L);
|
||||
|
||||
MiniDFSCluster cluster = null;
|
||||
FSDataInputStream stream = null;
|
||||
try {
|
||||
cluster = new MiniDFSCluster.Builder(conf).build();
|
||||
DistributedFileSystem dfs = (DistributedFileSystem)
|
||||
FileSystem.newInstance(conf);
|
||||
final Path TEST_PATH = new Path("/test1");
|
||||
DFSTestUtil.createFile(dfs, TEST_PATH, 128, (short)1, 1);
|
||||
stream = dfs.open(TEST_PATH);
|
||||
// Read a byte. This will trigger the creation of a block reader.
|
||||
stream.seek(2);
|
||||
int b = stream.read();
|
||||
Assert.assertTrue(-1 != b);
|
||||
|
||||
// The Peer cache should start off empty.
|
||||
PeerCache cache = dfs.getClient().getClientContext().getPeerCache();
|
||||
Assert.assertEquals(0, cache.size());
|
||||
|
||||
// Unbuffer should clear the block reader and return the socket to the
|
||||
// cache.
|
||||
stream.unbuffer();
|
||||
stream.seek(2);
|
||||
Assert.assertEquals(1, cache.size());
|
||||
int b2 = stream.read();
|
||||
Assert.assertEquals(b, b2);
|
||||
} finally {
|
||||
if (stream != null) {
|
||||
IOUtils.cleanup(null, stream);
|
||||
}
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test opening many files via TCP (not short-circuit).
|
||||
*
|
||||
* This is practical when using unbuffer, because it reduces the number of
|
||||
* sockets and amount of memory that we use.
|
||||
*/
|
||||
@Test
|
||||
public void testOpenManyFilesViaTcp() throws Exception {
|
||||
final int NUM_OPENS = 500;
|
||||
Configuration conf = new Configuration();
|
||||
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, false);
|
||||
MiniDFSCluster cluster = null;
|
||||
FSDataInputStream[] streams = new FSDataInputStream[NUM_OPENS];
|
||||
try {
|
||||
cluster = new MiniDFSCluster.Builder(conf).build();
|
||||
DistributedFileSystem dfs = cluster.getFileSystem();
|
||||
final Path TEST_PATH = new Path("/testFile");
|
||||
DFSTestUtil.createFile(dfs, TEST_PATH, 131072, (short)1, 1);
|
||||
|
||||
for (int i = 0; i < NUM_OPENS; i++) {
|
||||
streams[i] = dfs.open(TEST_PATH);
|
||||
LOG.info("opening file " + i + "...");
|
||||
Assert.assertTrue(-1 != streams[i].read());
|
||||
streams[i].unbuffer();
|
||||
}
|
||||
} finally {
|
||||
for (FSDataInputStream stream : streams) {
|
||||
IOUtils.cleanup(null, stream);
|
||||
}
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue