diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 1269d66bec3..88e5dd76cf5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -334,6 +334,9 @@ Release 2.4.0 - UNRELEASED HDFS-6078. TestIncrementalBlockReports is flaky. (Arpit Agarwal) + HDFS-6071. BlockReaderLocal doesn't return -1 on EOF when doing a + zero-length read on a short file (cmccabe) + BREAKDOWN OF HDFS-5698 SUBTASKS AND RELATED JIRAS HDFS-5717. Save FSImage header in protobuf. (Haohui Mai via jing9) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java index 9e2e92e27ab..60cb23f480c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java @@ -478,7 +478,7 @@ private synchronized int readWithBounceBuffer(ByteBuffer buf, total += bb; if (buf.remaining() == 0) return total; } - boolean eof = false; + boolean eof = true, done = false; do { if (buf.isDirect() && (buf.remaining() >= maxReadaheadLength) && ((dataPos % bytesPerChecksum) == 0)) { @@ -493,20 +493,24 @@ private synchronized int readWithBounceBuffer(ByteBuffer buf, buf.limit(oldLimit); } if (nRead < maxReadaheadLength) { - eof = true; + done = true; + } + if (nRead > 0) { + eof = false; } total += nRead; } else { // Slow lane: refill bounce buffer. if (fillDataBuf(canSkipChecksum)) { - eof = true; + done = true; } bb = drainDataBuf(buf); // drain bounce buffer if possible if (bb >= 0) { + eof = false; total += bb; } } - } while ((!eof) && (buf.remaining() > 0)); + } while ((!done) && (buf.remaining() > 0)); return (eof && total == 0) ? -1 : total; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java index e80c777b862..ee02eb5de32 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java @@ -57,11 +57,14 @@ import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.net.unix.DomainSocket; +import org.apache.hadoop.net.unix.TemporarySocketDirectory; import org.apache.hadoop.security.ShellBasedUnixGroupsMapping; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.VersionInfo; +import org.junit.Assume; import java.io.*; import java.net.*; @@ -1157,4 +1160,43 @@ public static long roundUpToMultiple(long val, int factor) { long c = (val + factor - 1) / factor; return c * factor; } + + /** + * A short-circuit test context which makes it easier to get a short-circuit + * configuration and set everything up. + */ + public static class ShortCircuitTestContext implements Closeable { + private final String testName; + private final TemporarySocketDirectory sockDir; + private boolean closed = false; + private boolean formerTcpReadsDisabled; + + public ShortCircuitTestContext(String testName) { + this.testName = testName; + this.sockDir = new TemporarySocketDirectory(); + DomainSocket.disableBindPathValidation(); + formerTcpReadsDisabled = DFSInputStream.tcpReadsDisabledForTesting; + Assume.assumeTrue(DomainSocket.getLoadingFailureReason() == null); + } + + public Configuration newConfiguration() { + Configuration conf = new Configuration(); + conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true); + conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY, + new File(sockDir.getDir(), + testName + "._PORT.sock").getAbsolutePath()); + return conf; + } + + public String getTestName() { + return testName; + } + + public void close() throws IOException { + if (closed) return; + closed = true; + DFSInputStream.tcpReadsDisabledForTesting = formerTcpReadsDisabled; + sockDir.close(); + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRead.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRead.java new file mode 100644 index 00000000000..c36a86f0022 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRead.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import junit.framework.Assert; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSTestUtil.ShortCircuitTestContext; +import org.junit.Test; + +public class TestRead { + final private int BLOCK_SIZE = 512; + + private void testEOF(MiniDFSCluster cluster, int fileLength) throws IOException { + FileSystem fs = cluster.getFileSystem(); + Path path = new Path("testEOF." + fileLength); + DFSTestUtil.createFile(fs, path, fileLength, (short)1, 0xBEEFBEEF); + FSDataInputStream fis = fs.open(path); + ByteBuffer empty = ByteBuffer.allocate(0); + // A read into an empty bytebuffer at the beginning of the file gives 0. + Assert.assertEquals(0, fis.read(empty)); + fis.seek(fileLength); + // A read into an empty bytebuffer at the end of the file gives -1. + Assert.assertEquals(-1, fis.read(empty)); + if (fileLength > BLOCK_SIZE) { + fis.seek(fileLength - BLOCK_SIZE + 1); + ByteBuffer dbb = ByteBuffer.allocateDirect(BLOCK_SIZE); + Assert.assertEquals(BLOCK_SIZE - 1, fis.read(dbb)); + } + fis.close(); + } + + @Test(timeout=60000) + public void testEOFWithBlockReaderLocal() throws Exception { + ShortCircuitTestContext testContext = + new ShortCircuitTestContext("testEOFWithBlockReaderLocal"); + try { + final Configuration conf = testContext.newConfiguration(); + conf.setLong(DFSConfigKeys.DFS_CLIENT_CACHE_READAHEAD, BLOCK_SIZE); + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1) + .format(true).build(); + testEOF(cluster, 1); + testEOF(cluster, 14); + testEOF(cluster, 10000); + cluster.shutdown(); + } finally { + testContext.close(); + } + } + + @Test(timeout=60000) + public void testEOFWithRemoteBlockReader() throws Exception { + final Configuration conf = new Configuration(); + conf.setLong(DFSConfigKeys.DFS_CLIENT_CACHE_READAHEAD, BLOCK_SIZE); + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1) + .format(true).build(); + testEOF(cluster, 1); + testEOF(cluster, 14); + testEOF(cluster, 10000); + cluster.shutdown(); + } +}