HDFS-11379. DFSInputStream may infinite loop requesting block locations. Contributed by Daryn Sharp.

This commit is contained in:
Kihwal Lee 2017-02-10 12:27:08 -06:00
parent 2b7a7bbe0f
commit 07a5184f74
2 changed files with 70 additions and 29 deletions

View File

@ -421,33 +421,36 @@ public class DFSInputStream extends FSInputStream
}
else {
// search cached blocks first
int targetBlockIdx = locatedBlocks.findBlock(offset);
if (targetBlockIdx < 0) { // block is not cached
targetBlockIdx = LocatedBlocks.getInsertIndex(targetBlockIdx);
// fetch more blocks
final LocatedBlocks newBlocks = dfsClient.getLocatedBlocks(src, offset);
assert (newBlocks != null) : "Could not find target position " + offset;
locatedBlocks.insertRange(targetBlockIdx, newBlocks.getLocatedBlocks());
}
blk = locatedBlocks.get(targetBlockIdx);
blk = fetchBlockAt(offset, 0, true);
}
return blk;
}
}
/** Fetch a block from namenode and cache it */
protected void fetchBlockAt(long offset) throws IOException {
protected LocatedBlock fetchBlockAt(long offset) throws IOException {
return fetchBlockAt(offset, 0, false); // don't use cache
}
/** Fetch a block from namenode and cache it */
private LocatedBlock fetchBlockAt(long offset, long length, boolean useCache)
throws IOException {
synchronized(infoLock) {
int targetBlockIdx = locatedBlocks.findBlock(offset);
if (targetBlockIdx < 0) { // block is not cached
targetBlockIdx = LocatedBlocks.getInsertIndex(targetBlockIdx);
useCache = false;
}
// fetch blocks
final LocatedBlocks newBlocks = dfsClient.getLocatedBlocks(src, offset);
if (newBlocks == null) {
throw new IOException("Could not find target position " + offset);
if (!useCache) { // fetch blocks
final LocatedBlocks newBlocks = (length == 0)
? dfsClient.getLocatedBlocks(src, offset)
: dfsClient.getLocatedBlocks(src, offset, length);
if (newBlocks == null || newBlocks.locatedBlockCount() == 0) {
throw new EOFException("Could not find target position " + offset);
}
locatedBlocks.insertRange(targetBlockIdx, newBlocks.getLocatedBlocks());
}
locatedBlocks.insertRange(targetBlockIdx, newBlocks.getLocatedBlocks());
return locatedBlocks.get(targetBlockIdx);
}
}
@ -502,28 +505,15 @@ public class DFSInputStream extends FSInputStream
assert (locatedBlocks != null) : "locatedBlocks is null";
List<LocatedBlock> blockRange = new ArrayList<>();
// search cached blocks first
int blockIdx = locatedBlocks.findBlock(offset);
if (blockIdx < 0) { // block is not cached
blockIdx = LocatedBlocks.getInsertIndex(blockIdx);
}
long remaining = length;
long curOff = offset;
while(remaining > 0) {
LocatedBlock blk = null;
if(blockIdx < locatedBlocks.locatedBlockCount())
blk = locatedBlocks.get(blockIdx);
if (blk == null || curOff < blk.getStartOffset()) {
LocatedBlocks newBlocks;
newBlocks = dfsClient.getLocatedBlocks(src, curOff, remaining);
locatedBlocks.insertRange(blockIdx, newBlocks.getLocatedBlocks());
continue;
}
LocatedBlock blk = fetchBlockAt(curOff, remaining, true);
assert curOff >= blk.getStartOffset() : "Block not found";
blockRange.add(blk);
long bytesRead = blk.getStartOffset() + blk.getBlockSize() - curOff;
remaining -= bytesRead;
curOff += bytesRead;
blockIdx++;
}
return blockRange;
}

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.EOFException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Random;
@ -28,6 +29,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ChecksumException;
@ -42,6 +44,7 @@ import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.log4j.Level;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
@ -491,6 +494,54 @@ public class TestPread {
}
}
@Test
public void testTruncateWhileReading() throws Exception {
Path path = new Path("/testfile");
final int blockSize = 512;
// prevent initial pre-fetch of multiple block locations
Configuration conf = new Configuration();
conf.setLong(HdfsClientConfigKeys.Read.PREFETCH_SIZE_KEY, blockSize);
MiniDFSCluster cluster =
new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
try {
DistributedFileSystem fs = cluster.getFileSystem();
// create multi-block file
FSDataOutputStream dos =
fs.create(path, true, blockSize, (short)1, blockSize);
dos.write(new byte[blockSize*3]);
dos.close();
// truncate a file while it's open
final FSDataInputStream dis = fs.open(path);
while (!fs.truncate(path, 10)) {
Thread.sleep(10);
}
// verify that reading bytes outside the initial pre-fetch do
// not send the client into an infinite loop querying locations.
ExecutorService executor = Executors.newFixedThreadPool(1);
Future<?> future = executor.submit(new Callable<Void>() {
@Override
public Void call() throws IOException {
// read from 2nd block.
dis.readFully(blockSize, new byte[4]);
return null;
}
});
try {
future.get(4, TimeUnit.SECONDS);
Assert.fail();
} catch (ExecutionException ee) {
assertTrue(ee.toString(), ee.getCause() instanceof EOFException);
} finally {
future.cancel(true);
executor.shutdown();
}
} finally {
cluster.shutdown();
}
}
public static void main(String[] args) throws Exception {
new TestPread().testPreadDFS();
}