HDFS-2034. Length in DFSInputStream.getBlockRange(..) becomes -ve when reading only from a currently being written block. Contributed by John George
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1144480 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
787dcfb8cd
commit
6ec9b178c6
|
@ -811,6 +811,10 @@ Trunk (unreleased changes)
|
||||||
HDFS-1990. Fix resource leaks in BlockReceiver.close(). (Uma Maheswara
|
HDFS-1990. Fix resource leaks in BlockReceiver.close(). (Uma Maheswara
|
||||||
Rao G via szetszwo)
|
Rao G via szetszwo)
|
||||||
|
|
||||||
|
HDFS-2034. Length in DFSInputStream.getBlockRange(..) becomes -ve when
|
||||||
|
reading only from a currently being written block. (John George via
|
||||||
|
szetszwo)
|
||||||
|
|
||||||
Release 0.22.0 - Unreleased
|
Release 0.22.0 - Unreleased
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -294,8 +294,8 @@ public class DFSInputStream extends FSInputStream {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get blocks in the specified range.
|
* Get blocks in the specified range.
|
||||||
* Fetch them from the namenode if not cached.
|
* Fetch them from the namenode if not cached. This function
|
||||||
*
|
* will not get a read request beyond the EOF.
|
||||||
* @param offset
|
* @param offset
|
||||||
* @param length
|
* @param length
|
||||||
* @return consequent segment of located blocks
|
* @return consequent segment of located blocks
|
||||||
|
@ -304,28 +304,31 @@ public class DFSInputStream extends FSInputStream {
|
||||||
private synchronized List<LocatedBlock> getBlockRange(long offset,
|
private synchronized List<LocatedBlock> getBlockRange(long offset,
|
||||||
long length)
|
long length)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
final List<LocatedBlock> blocks;
|
// getFileLength(): returns total file length
|
||||||
if (locatedBlocks.isLastBlockComplete()) {
|
// locatedBlocks.getFileLength(): returns length of completed blocks
|
||||||
blocks = getFinalizedBlockRange(offset, length);
|
if (offset >= getFileLength()) {
|
||||||
|
throw new IOException("Offset: " + offset +
|
||||||
|
" exceeds file length: " + getFileLength());
|
||||||
}
|
}
|
||||||
else {
|
|
||||||
final boolean readPastEnd = offset + length > locatedBlocks.getFileLength();
|
|
||||||
/* if requested length is greater than current file length
|
|
||||||
* then, it could possibly be from the current block being
|
|
||||||
* written to. First get the finalized block range and then
|
|
||||||
* if necessary, get the length of last block being written
|
|
||||||
* to.
|
|
||||||
*/
|
|
||||||
if (readPastEnd)
|
|
||||||
length = locatedBlocks.getFileLength() - offset;
|
|
||||||
|
|
||||||
blocks = getFinalizedBlockRange(offset, length);
|
final List<LocatedBlock> blocks;
|
||||||
/* requested length is greater than what finalized blocks
|
final long lengthOfCompleteBlk = locatedBlocks.getFileLength();
|
||||||
* have.
|
final boolean readOffsetWithinCompleteBlk = offset < lengthOfCompleteBlk;
|
||||||
*/
|
final boolean readLengthPastCompleteBlk = offset + length > lengthOfCompleteBlk;
|
||||||
if (readPastEnd)
|
|
||||||
|
if (readOffsetWithinCompleteBlk) {
|
||||||
|
//get the blocks of finalized (completed) block range
|
||||||
|
blocks = getFinalizedBlockRange(offset,
|
||||||
|
Math.min(length, lengthOfCompleteBlk - offset));
|
||||||
|
} else {
|
||||||
|
blocks = new ArrayList<LocatedBlock>(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
// get the blocks from incomplete block range
|
||||||
|
if (readLengthPastCompleteBlk) {
|
||||||
blocks.add(locatedBlocks.getLastLocatedBlock());
|
blocks.add(locatedBlocks.getLastLocatedBlock());
|
||||||
}
|
}
|
||||||
|
|
||||||
return blocks;
|
return blocks;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -44,6 +44,7 @@ public class TestWriteRead {
|
||||||
|
|
||||||
private static final int BUFFER_SIZE = 8192 * 100;
|
private static final int BUFFER_SIZE = 8192 * 100;
|
||||||
private static final String ROOT_DIR = "/tmp/";
|
private static final String ROOT_DIR = "/tmp/";
|
||||||
|
private static final long blockSize = 1024*100;
|
||||||
|
|
||||||
// command-line options. Different defaults for unit test vs real cluster
|
// command-line options. Different defaults for unit test vs real cluster
|
||||||
String filenameOption = ROOT_DIR + "fileX1";
|
String filenameOption = ROOT_DIR + "fileX1";
|
||||||
|
@ -69,7 +70,7 @@ public class TestWriteRead {
|
||||||
LOG.info("initJunitModeTest");
|
LOG.info("initJunitModeTest");
|
||||||
|
|
||||||
conf = new HdfsConfiguration();
|
conf = new HdfsConfiguration();
|
||||||
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 1024 * 100); // 100K
|
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize); // 100K
|
||||||
// blocksize
|
// blocksize
|
||||||
|
|
||||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
|
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
|
||||||
|
@ -99,15 +100,14 @@ public class TestWriteRead {
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Junit Test reading while writing. */
|
/** Junit Test reading while writing. */
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testWriteReadSeq() throws IOException {
|
public void testWriteReadSeq() throws IOException {
|
||||||
useFCOption = false;
|
useFCOption = false;
|
||||||
positionReadOption = false;
|
positionReadOption = false;
|
||||||
String fname = filenameOption;
|
String fname = filenameOption;
|
||||||
|
long rdBeginPos = 0;
|
||||||
// need to run long enough to fail: takes 25 to 35 seec on Mac
|
// need to run long enough to fail: takes 25 to 35 seec on Mac
|
||||||
int stat = testWriteAndRead(fname, WR_NTIMES, WR_CHUNK_SIZE);
|
int stat = testWriteAndRead(fname, WR_NTIMES, WR_CHUNK_SIZE, rdBeginPos);
|
||||||
LOG.info("Summary status from test1: status= " + stat);
|
LOG.info("Summary status from test1: status= " + stat);
|
||||||
Assert.assertEquals(0, stat);
|
Assert.assertEquals(0, stat);
|
||||||
}
|
}
|
||||||
|
@ -117,14 +117,27 @@ public class TestWriteRead {
|
||||||
public void testWriteReadPos() throws IOException {
|
public void testWriteReadPos() throws IOException {
|
||||||
String fname = filenameOption;
|
String fname = filenameOption;
|
||||||
positionReadOption = true; // position read
|
positionReadOption = true; // position read
|
||||||
int stat = testWriteAndRead(fname, WR_NTIMES, WR_CHUNK_SIZE);
|
long rdBeginPos = 0;
|
||||||
|
int stat = testWriteAndRead(fname, WR_NTIMES, WR_CHUNK_SIZE, rdBeginPos);
|
||||||
Assert.assertEquals(0, stat);
|
Assert.assertEquals(0, stat);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** Junit Test position read of the current block being written. */
|
||||||
|
@Test
|
||||||
|
public void testReadPosCurrentBlock() throws IOException {
|
||||||
|
String fname = filenameOption;
|
||||||
|
positionReadOption = true; // position read
|
||||||
|
int wrChunkSize = (int)(blockSize) + (int)(blockSize/2);
|
||||||
|
long rdBeginPos = blockSize+1;
|
||||||
|
int numTimes=5;
|
||||||
|
int stat = testWriteAndRead(fname, numTimes, wrChunkSize, rdBeginPos);
|
||||||
|
Assert.assertEquals(0, stat);
|
||||||
|
}
|
||||||
|
|
||||||
// equivalent of TestWriteRead1
|
// equivalent of TestWriteRead1
|
||||||
private int clusterTestWriteRead1() throws IOException {
|
private int clusterTestWriteRead1() throws IOException {
|
||||||
int stat = testWriteAndRead(filenameOption, loopOption, chunkSizeOption);
|
long rdBeginPos = 0;
|
||||||
|
int stat = testWriteAndRead(filenameOption, loopOption, chunkSizeOption, rdBeginPos);
|
||||||
return stat;
|
return stat;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -133,10 +146,9 @@ public class TestWriteRead {
|
||||||
* Return number of bytes read.
|
* Return number of bytes read.
|
||||||
* Support both sequential read and position read.
|
* Support both sequential read and position read.
|
||||||
*/
|
*/
|
||||||
private long readData(String fname, byte[] buffer, long byteExpected)
|
private long readData(String fname, byte[] buffer, long byteExpected, long beginPosition)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
long totalByteRead = 0;
|
long totalByteRead = 0;
|
||||||
long beginPosition = 0;
|
|
||||||
Path path = getFullyQualifiedPath(fname);
|
Path path = getFullyQualifiedPath(fname);
|
||||||
|
|
||||||
FSDataInputStream in = null;
|
FSDataInputStream in = null;
|
||||||
|
@ -263,7 +275,7 @@ public class TestWriteRead {
|
||||||
* After each iteration of write, do a read of the file from begin to end.
|
* After each iteration of write, do a read of the file from begin to end.
|
||||||
* Return 0 on success, else number of failure.
|
* Return 0 on success, else number of failure.
|
||||||
*/
|
*/
|
||||||
private int testWriteAndRead(String fname, int loopN, int chunkSize)
|
private int testWriteAndRead(String fname, int loopN, int chunkSize, long readBeginPosition)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
|
||||||
int countOfFailures = 0;
|
int countOfFailures = 0;
|
||||||
|
@ -324,7 +336,7 @@ public class TestWriteRead {
|
||||||
+ ". TotalByteVisible = " + totalByteVisible + " to file "
|
+ ". TotalByteVisible = " + totalByteVisible + " to file "
|
||||||
+ fname);
|
+ fname);
|
||||||
}
|
}
|
||||||
byteVisibleToRead = readData(fname, inBuffer, totalByteVisible);
|
byteVisibleToRead = readData(fname, inBuffer, totalByteVisible, readBeginPosition);
|
||||||
|
|
||||||
String readmsg = "Written=" + totalByteWritten + " ; Expected Visible="
|
String readmsg = "Written=" + totalByteWritten + " ; Expected Visible="
|
||||||
+ totalByteVisible + " ; Got Visible=" + byteVisibleToRead
|
+ totalByteVisible + " ; Got Visible=" + byteVisibleToRead
|
||||||
|
@ -353,7 +365,7 @@ public class TestWriteRead {
|
||||||
|
|
||||||
out.close();
|
out.close();
|
||||||
|
|
||||||
byteVisibleToRead = readData(fname, inBuffer, totalByteVisible);
|
byteVisibleToRead = readData(fname, inBuffer, totalByteVisible, readBeginPosition);
|
||||||
|
|
||||||
String readmsg2 = "Written=" + totalByteWritten + " ; Expected Visible="
|
String readmsg2 = "Written=" + totalByteWritten + " ; Expected Visible="
|
||||||
+ totalByteVisible + " ; Got Visible=" + byteVisibleToRead
|
+ totalByteVisible + " ; Got Visible=" + byteVisibleToRead
|
||||||
|
|
Loading…
Reference in New Issue