From 6ec9b178c664796b44eab6c41f7216577d380f7c Mon Sep 17 00:00:00 2001 From: Tsz-wo Sze Date: Fri, 8 Jul 2011 20:13:26 +0000 Subject: [PATCH] HDFS-2034. Length in DFSInputStream.getBlockRange(..) becomes -ve when reading only from a currently being written block. Contributed by John George git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1144480 13f79535-47bb-0310-9956-ffa450edef68 --- hdfs/CHANGES.txt | 6 ++- .../apache/hadoop/hdfs/DFSInputStream.java | 45 ++++++++++--------- .../org/apache/hadoop/hdfs/TestWriteRead.java | 36 ++++++++++----- 3 files changed, 53 insertions(+), 34 deletions(-) diff --git a/hdfs/CHANGES.txt b/hdfs/CHANGES.txt index 6054c7a452d..5cccebf91d7 100644 --- a/hdfs/CHANGES.txt +++ b/hdfs/CHANGES.txt @@ -808,9 +808,13 @@ Trunk (unreleased changes) HDFS-2053. Bug in INodeDirectory#computeContentSummary warning. (Michael Noll via eli) - HDFS-1990. Fix resource leaks in BlockReceiver.close(). (Uma Maheswara + HDFS-1990. Fix resource leaks in BlockReceiver.close(). (Uma Maheswara Rao G via szetszwo) + HDFS-2034. Length in DFSInputStream.getBlockRange(..) becomes -ve when + reading only from a currently being written block. (John George via + szetszwo) + Release 0.22.0 - Unreleased INCOMPATIBLE CHANGES diff --git a/hdfs/src/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hdfs/src/java/org/apache/hadoop/hdfs/DFSInputStream.java index 87365747795..ca08a00d809 100644 --- a/hdfs/src/java/org/apache/hadoop/hdfs/DFSInputStream.java +++ b/hdfs/src/java/org/apache/hadoop/hdfs/DFSInputStream.java @@ -294,8 +294,8 @@ public class DFSInputStream extends FSInputStream { /** * Get blocks in the specified range. - * Fetch them from the namenode if not cached. - * + * Fetch them from the namenode if not cached. This function + * will not get a read request beyond the EOF. * @param offset * @param length * @return consequent segment of located blocks @@ -304,28 +304,31 @@ public class DFSInputStream extends FSInputStream { private synchronized List getBlockRange(long offset, long length) throws IOException { - final List blocks; - if (locatedBlocks.isLastBlockComplete()) { - blocks = getFinalizedBlockRange(offset, length); + // getFileLength(): returns total file length + // locatedBlocks.getFileLength(): returns length of completed blocks + if (offset >= getFileLength()) { + throw new IOException("Offset: " + offset + + " exceeds file length: " + getFileLength()); } - else { - final boolean readPastEnd = offset + length > locatedBlocks.getFileLength(); - /* if requested length is greater than current file length - * then, it could possibly be from the current block being - * written to. First get the finalized block range and then - * if necessary, get the length of last block being written - * to. - */ - if (readPastEnd) - length = locatedBlocks.getFileLength() - offset; - blocks = getFinalizedBlockRange(offset, length); - /* requested length is greater than what finalized blocks - * have. - */ - if (readPastEnd) - blocks.add(locatedBlocks.getLastLocatedBlock()); + final List blocks; + final long lengthOfCompleteBlk = locatedBlocks.getFileLength(); + final boolean readOffsetWithinCompleteBlk = offset < lengthOfCompleteBlk; + final boolean readLengthPastCompleteBlk = offset + length > lengthOfCompleteBlk; + + if (readOffsetWithinCompleteBlk) { + //get the blocks of finalized (completed) block range + blocks = getFinalizedBlockRange(offset, + Math.min(length, lengthOfCompleteBlk - offset)); + } else { + blocks = new ArrayList(1); } + + // get the blocks from incomplete block range + if (readLengthPastCompleteBlk) { + blocks.add(locatedBlocks.getLastLocatedBlock()); + } + return blocks; } diff --git a/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestWriteRead.java b/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestWriteRead.java index 0baf759673c..6091ba28d5f 100644 --- a/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestWriteRead.java +++ b/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestWriteRead.java @@ -44,6 +44,7 @@ public class TestWriteRead { private static final int BUFFER_SIZE = 8192 * 100; private static final String ROOT_DIR = "/tmp/"; + private static final long blockSize = 1024*100; // command-line options. Different defaults for unit test vs real cluster String filenameOption = ROOT_DIR + "fileX1"; @@ -69,8 +70,8 @@ public class TestWriteRead { LOG.info("initJunitModeTest"); conf = new HdfsConfiguration(); - conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 1024 * 100); // 100K - // blocksize + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize); // 100K + // blocksize cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build(); cluster.waitActive(); @@ -99,15 +100,14 @@ public class TestWriteRead { } /** Junit Test reading while writing. */ - @Test public void testWriteReadSeq() throws IOException { useFCOption = false; positionReadOption = false; String fname = filenameOption; - + long rdBeginPos = 0; // need to run long enough to fail: takes 25 to 35 seec on Mac - int stat = testWriteAndRead(fname, WR_NTIMES, WR_CHUNK_SIZE); + int stat = testWriteAndRead(fname, WR_NTIMES, WR_CHUNK_SIZE, rdBeginPos); LOG.info("Summary status from test1: status= " + stat); Assert.assertEquals(0, stat); } @@ -117,14 +117,27 @@ public class TestWriteRead { public void testWriteReadPos() throws IOException { String fname = filenameOption; positionReadOption = true; // position read - int stat = testWriteAndRead(fname, WR_NTIMES, WR_CHUNK_SIZE); + long rdBeginPos = 0; + int stat = testWriteAndRead(fname, WR_NTIMES, WR_CHUNK_SIZE, rdBeginPos); Assert.assertEquals(0, stat); } + /** Junit Test position read of the current block being written. */ + @Test + public void testReadPosCurrentBlock() throws IOException { + String fname = filenameOption; + positionReadOption = true; // position read + int wrChunkSize = (int)(blockSize) + (int)(blockSize/2); + long rdBeginPos = blockSize+1; + int numTimes=5; + int stat = testWriteAndRead(fname, numTimes, wrChunkSize, rdBeginPos); + Assert.assertEquals(0, stat); + } // equivalent of TestWriteRead1 private int clusterTestWriteRead1() throws IOException { - int stat = testWriteAndRead(filenameOption, loopOption, chunkSizeOption); + long rdBeginPos = 0; + int stat = testWriteAndRead(filenameOption, loopOption, chunkSizeOption, rdBeginPos); return stat; } @@ -133,10 +146,9 @@ public class TestWriteRead { * Return number of bytes read. * Support both sequential read and position read. */ - private long readData(String fname, byte[] buffer, long byteExpected) + private long readData(String fname, byte[] buffer, long byteExpected, long beginPosition) throws IOException { long totalByteRead = 0; - long beginPosition = 0; Path path = getFullyQualifiedPath(fname); FSDataInputStream in = null; @@ -263,7 +275,7 @@ public class TestWriteRead { * After each iteration of write, do a read of the file from begin to end. * Return 0 on success, else number of failure. */ - private int testWriteAndRead(String fname, int loopN, int chunkSize) + private int testWriteAndRead(String fname, int loopN, int chunkSize, long readBeginPosition) throws IOException { int countOfFailures = 0; @@ -324,7 +336,7 @@ public class TestWriteRead { + ". TotalByteVisible = " + totalByteVisible + " to file " + fname); } - byteVisibleToRead = readData(fname, inBuffer, totalByteVisible); + byteVisibleToRead = readData(fname, inBuffer, totalByteVisible, readBeginPosition); String readmsg = "Written=" + totalByteWritten + " ; Expected Visible=" + totalByteVisible + " ; Got Visible=" + byteVisibleToRead @@ -353,7 +365,7 @@ public class TestWriteRead { out.close(); - byteVisibleToRead = readData(fname, inBuffer, totalByteVisible); + byteVisibleToRead = readData(fname, inBuffer, totalByteVisible, readBeginPosition); String readmsg2 = "Written=" + totalByteWritten + " ; Expected Visible=" + totalByteVisible + " ; Got Visible=" + byteVisibleToRead