diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CanUnbuffer.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CanUnbuffer.java new file mode 100644 index 00000000000..07e65f5c301 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CanUnbuffer.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * FSDataInputStreams implement this interface to indicate that they can clear + * their buffers on request. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public interface CanUnbuffer { + /** + * Reduce the buffering. This will also free sockets and file descriptors + * held by the stream, if possible. + */ + public void unbuffer(); +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java index 6d39d1e0b89..477bd6f47ee 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java @@ -35,7 +35,7 @@ import org.apache.hadoop.util.IdentityHashStore; public class FSDataInputStream extends DataInputStream implements Seekable, PositionedReadable, ByteBufferReadable, HasFileDescriptor, CanSetDropBehind, CanSetReadahead, - HasEnhancedByteBufferAccess { + HasEnhancedByteBufferAccess, CanUnbuffer { /** * Map ByteBuffers that we have handed out to readers to ByteBufferPool * objects @@ -220,4 +220,14 @@ public class FSDataInputStream extends DataInputStream bufferPool.putBuffer(buffer); } } + + @Override + public void unbuffer() { + try { + ((CanUnbuffer)in).unbuffer(); + } catch (ClassCastException e) { + throw new UnsupportedOperationException("this stream does not " + + "support unbuffering."); + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index dcbf36384c0..481cea8c83b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -324,6 +324,8 @@ Release 2.7.0 - UNRELEASED HDFS-7761. cleanup unnecssary code logic in LocatedBlock. (yliu) + HDFS-7694. FSDataInputStream should support "unbuffer" (cmccabe) + OPTIMIZATIONS HDFS-7454. Reduce memory footprint for AclEntries in NameNode. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java index 618f0407dfe..25c23e13dd6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java @@ -49,6 +49,7 @@ import org.apache.hadoop.fs.ByteBufferReadable; import org.apache.hadoop.fs.ByteBufferUtil; import org.apache.hadoop.fs.CanSetDropBehind; import org.apache.hadoop.fs.CanSetReadahead; +import org.apache.hadoop.fs.CanUnbuffer; import org.apache.hadoop.fs.ChecksumException; import org.apache.hadoop.fs.FSInputStream; import org.apache.hadoop.fs.HasEnhancedByteBufferAccess; @@ -86,7 +87,7 @@ import com.google.common.annotations.VisibleForTesting; @InterfaceAudience.Private public class DFSInputStream extends FSInputStream implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, - HasEnhancedByteBufferAccess { + HasEnhancedByteBufferAccess, CanUnbuffer { @VisibleForTesting public static boolean tcpReadsDisabledForTesting = false; private long hedgedReadOpsLoopNumForTesting = 0; @@ -1818,4 +1819,9 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, ((ByteBufferPool)val).putBuffer(buffer); } } + + @Override + public synchronized void unbuffer() { + closeCurrentBlockReader(); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/PeerCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/PeerCache.java index 07c562e1df1..08b0468bfa8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/PeerCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/PeerCache.java @@ -29,16 +29,21 @@ import com.google.common.collect.LinkedListMultimap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.net.Peer; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.hdfs.net.Peer; import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.Time; /** * A cache of input stream sockets to Data Node. */ -class PeerCache { +@InterfaceStability.Unstable +@InterfaceAudience.Private +@VisibleForTesting +public class PeerCache { private static final Log LOG = LogFactory.getLog(PeerCache.class); private static class Key { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.c b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.c index 6e6dd216473..27a28097efe 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.c +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.c @@ -1037,6 +1037,34 @@ done: return file; } +int hdfsUnbufferFile(hdfsFile file) +{ + int ret; + jthrowable jthr; + JNIEnv *env = getJNIEnv(); + + if (!env) { + ret = EINTERNAL; + goto done; + } + if (file->type != HDFS_STREAM_INPUT) { + ret = ENOTSUP; + goto done; + } + jthr = invokeMethod(env, NULL, INSTANCE, file->file, HADOOP_ISTRM, + "unbuffer", "()V"); + if (jthr) { + ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, + HADOOP_ISTRM "#unbuffer failed:"); + goto done; + } + ret = 0; + +done: + errno = ret; + return ret; +} + int hdfsCloseFile(hdfsFS fs, hdfsFile file) { int ret; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.h b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.h index 072051f2614..3406d6b481e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.h +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.h @@ -347,6 +347,15 @@ extern "C" { hdfsFile hdfsOpenFile(hdfsFS fs, const char* path, int flags, int bufferSize, short replication, tSize blocksize); + /** + * hdfsUnbufferFile - Reduce the buffering done on a file. + * + * @param file The file to unbuffer. + * @return 0 on success + * ENOTSUP if the file does not support unbuffering + * Errno will also be set to this value. + */ + int hdfsUnbufferFile(hdfsFile file); /** * hdfsCloseFile - Close an open file. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestUnbuffer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestUnbuffer.java new file mode 100644 index 00000000000..52c33e95df7 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestUnbuffer.java @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.PeerCache; +import org.apache.hadoop.io.IOUtils; +import org.junit.Assert; +import org.junit.Test; + +public class TestUnbuffer { + private static final Log LOG = + LogFactory.getLog(TestUnbuffer.class.getName()); + + /** + * Test that calling Unbuffer closes sockets. + */ + @Test + public void testUnbufferClosesSockets() throws Exception { + Configuration conf = new Configuration(); + // Set a new ClientContext. This way, we will have our own PeerCache, + // rather than sharing one with other unit tests. + conf.set(DFSConfigKeys.DFS_CLIENT_CONTEXT, + "testUnbufferClosesSocketsContext"); + + // Disable short-circuit reads. With short-circuit, we wouldn't hold open a + // TCP socket. + conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, false); + + // Set a really long socket timeout to avoid test timing issues. + conf.setLong(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, + 100000000L); + conf.setLong(DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY, + 100000000L); + + MiniDFSCluster cluster = null; + FSDataInputStream stream = null; + try { + cluster = new MiniDFSCluster.Builder(conf).build(); + DistributedFileSystem dfs = (DistributedFileSystem) + FileSystem.newInstance(conf); + final Path TEST_PATH = new Path("/test1"); + DFSTestUtil.createFile(dfs, TEST_PATH, 128, (short)1, 1); + stream = dfs.open(TEST_PATH); + // Read a byte. This will trigger the creation of a block reader. + stream.seek(2); + int b = stream.read(); + Assert.assertTrue(-1 != b); + + // The Peer cache should start off empty. + PeerCache cache = dfs.getClient().getClientContext().getPeerCache(); + Assert.assertEquals(0, cache.size()); + + // Unbuffer should clear the block reader and return the socket to the + // cache. + stream.unbuffer(); + stream.seek(2); + Assert.assertEquals(1, cache.size()); + int b2 = stream.read(); + Assert.assertEquals(b, b2); + } finally { + if (stream != null) { + IOUtils.cleanup(null, stream); + } + if (cluster != null) { + cluster.shutdown(); + } + } + } + + /** + * Test opening many files via TCP (not short-circuit). + * + * This is practical when using unbuffer, because it reduces the number of + * sockets and amount of memory that we use. + */ + @Test + public void testOpenManyFilesViaTcp() throws Exception { + final int NUM_OPENS = 500; + Configuration conf = new Configuration(); + conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, false); + MiniDFSCluster cluster = null; + FSDataInputStream[] streams = new FSDataInputStream[NUM_OPENS]; + try { + cluster = new MiniDFSCluster.Builder(conf).build(); + DistributedFileSystem dfs = cluster.getFileSystem(); + final Path TEST_PATH = new Path("/testFile"); + DFSTestUtil.createFile(dfs, TEST_PATH, 131072, (short)1, 1); + + for (int i = 0; i < NUM_OPENS; i++) { + streams[i] = dfs.open(TEST_PATH); + LOG.info("opening file " + i + "..."); + Assert.assertTrue(-1 != streams[i].read()); + streams[i].unbuffer(); + } + } finally { + for (FSDataInputStream stream : streams) { + IOUtils.cleanup(null, stream); + } + if (cluster != null) { + cluster.shutdown(); + } + } + } +}