HDFS-11379. DFSInputStream may infinite loop requesting block locations. Contributed by Daryn Sharp.
This commit is contained in:
parent
c88ec54588
commit
33c62d2d19
|
@ -511,34 +511,37 @@ public class DFSInputStream extends FSInputStream
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
// search cached blocks first
|
// search cached blocks first
|
||||||
int targetBlockIdx = locatedBlocks.findBlock(offset);
|
blk = fetchBlockAt(offset, 0, true);
|
||||||
if (targetBlockIdx < 0) { // block is not cached
|
|
||||||
targetBlockIdx = LocatedBlocks.getInsertIndex(targetBlockIdx);
|
|
||||||
// fetch more blocks
|
|
||||||
final LocatedBlocks newBlocks = dfsClient.getLocatedBlocks(src, offset);
|
|
||||||
assert (newBlocks != null) : "Could not find target position " + offset;
|
|
||||||
locatedBlocks.insertRange(targetBlockIdx, newBlocks.getLocatedBlocks());
|
|
||||||
}
|
|
||||||
blk = locatedBlocks.get(targetBlockIdx);
|
|
||||||
}
|
}
|
||||||
return blk;
|
return blk;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Fetch a block from namenode and cache it */
|
/** Fetch a block from namenode and cache it */
|
||||||
protected void fetchBlockAt(long offset) throws IOException {
|
protected LocatedBlock fetchBlockAt(long offset) throws IOException {
|
||||||
|
return fetchBlockAt(offset, 0, false); // don't use cache
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Fetch a block from namenode and cache it */
|
||||||
|
private LocatedBlock fetchBlockAt(long offset, long length, boolean useCache)
|
||||||
|
throws IOException {
|
||||||
synchronized(infoLock) {
|
synchronized(infoLock) {
|
||||||
int targetBlockIdx = locatedBlocks.findBlock(offset);
|
int targetBlockIdx = locatedBlocks.findBlock(offset);
|
||||||
if (targetBlockIdx < 0) { // block is not cached
|
if (targetBlockIdx < 0) { // block is not cached
|
||||||
targetBlockIdx = LocatedBlocks.getInsertIndex(targetBlockIdx);
|
targetBlockIdx = LocatedBlocks.getInsertIndex(targetBlockIdx);
|
||||||
|
useCache = false;
|
||||||
}
|
}
|
||||||
// fetch blocks
|
if (!useCache) { // fetch blocks
|
||||||
final LocatedBlocks newBlocks = dfsClient.getLocatedBlocks(src, offset);
|
final LocatedBlocks newBlocks = (length == 0)
|
||||||
if (newBlocks == null) {
|
? dfsClient.getLocatedBlocks(src, offset)
|
||||||
throw new IOException("Could not find target position " + offset);
|
: dfsClient.getLocatedBlocks(src, offset, length);
|
||||||
|
if (newBlocks == null || newBlocks.locatedBlockCount() == 0) {
|
||||||
|
throw new EOFException("Could not find target position " + offset);
|
||||||
}
|
}
|
||||||
locatedBlocks.insertRange(targetBlockIdx, newBlocks.getLocatedBlocks());
|
locatedBlocks.insertRange(targetBlockIdx, newBlocks.getLocatedBlocks());
|
||||||
}
|
}
|
||||||
|
return locatedBlocks.get(targetBlockIdx);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -592,28 +595,15 @@ public class DFSInputStream extends FSInputStream
|
||||||
assert (locatedBlocks != null) : "locatedBlocks is null";
|
assert (locatedBlocks != null) : "locatedBlocks is null";
|
||||||
List<LocatedBlock> blockRange = new ArrayList<>();
|
List<LocatedBlock> blockRange = new ArrayList<>();
|
||||||
// search cached blocks first
|
// search cached blocks first
|
||||||
int blockIdx = locatedBlocks.findBlock(offset);
|
|
||||||
if (blockIdx < 0) { // block is not cached
|
|
||||||
blockIdx = LocatedBlocks.getInsertIndex(blockIdx);
|
|
||||||
}
|
|
||||||
long remaining = length;
|
long remaining = length;
|
||||||
long curOff = offset;
|
long curOff = offset;
|
||||||
while(remaining > 0) {
|
while(remaining > 0) {
|
||||||
LocatedBlock blk = null;
|
LocatedBlock blk = fetchBlockAt(curOff, remaining, true);
|
||||||
if(blockIdx < locatedBlocks.locatedBlockCount())
|
|
||||||
blk = locatedBlocks.get(blockIdx);
|
|
||||||
if (blk == null || curOff < blk.getStartOffset()) {
|
|
||||||
LocatedBlocks newBlocks;
|
|
||||||
newBlocks = dfsClient.getLocatedBlocks(src, curOff, remaining);
|
|
||||||
locatedBlocks.insertRange(blockIdx, newBlocks.getLocatedBlocks());
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
assert curOff >= blk.getStartOffset() : "Block not found";
|
assert curOff >= blk.getStartOffset() : "Block not found";
|
||||||
blockRange.add(blk);
|
blockRange.add(blk);
|
||||||
long bytesRead = blk.getStartOffset() + blk.getBlockSize() - curOff;
|
long bytesRead = blk.getStartOffset() + blk.getBlockSize() - curOff;
|
||||||
remaining -= bytesRead;
|
remaining -= bytesRead;
|
||||||
curOff += bytesRead;
|
curOff += bytesRead;
|
||||||
blockIdx++;
|
|
||||||
}
|
}
|
||||||
return blockRange;
|
return blockRange;
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
import java.io.DataOutputStream;
|
import java.io.DataOutputStream;
|
||||||
|
import java.io.EOFException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
|
@ -29,7 +30,7 @@ import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.ChecksumException;
|
import org.apache.hadoop.fs.ChecksumException;
|
||||||
import org.apache.hadoop.fs.FSDataInputStream;
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
|
@ -43,6 +44,7 @@ import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.apache.log4j.Level;
|
import org.apache.log4j.Level;
|
||||||
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.mockito.Mockito;
|
import org.mockito.Mockito;
|
||||||
|
@ -494,6 +496,54 @@ public class TestPread {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testTruncateWhileReading() throws Exception {
|
||||||
|
Path path = new Path("/testfile");
|
||||||
|
final int blockSize = 512;
|
||||||
|
|
||||||
|
// prevent initial pre-fetch of multiple block locations
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
conf.setLong(HdfsClientConfigKeys.Read.PREFETCH_SIZE_KEY, blockSize);
|
||||||
|
|
||||||
|
MiniDFSCluster cluster =
|
||||||
|
new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
|
||||||
|
try {
|
||||||
|
DistributedFileSystem fs = cluster.getFileSystem();
|
||||||
|
// create multi-block file
|
||||||
|
FSDataOutputStream dos =
|
||||||
|
fs.create(path, true, blockSize, (short)1, blockSize);
|
||||||
|
dos.write(new byte[blockSize*3]);
|
||||||
|
dos.close();
|
||||||
|
// truncate a file while it's open
|
||||||
|
final FSDataInputStream dis = fs.open(path);
|
||||||
|
while (!fs.truncate(path, 10)) {
|
||||||
|
Thread.sleep(10);
|
||||||
|
}
|
||||||
|
// verify that reading bytes outside the initial pre-fetch do
|
||||||
|
// not send the client into an infinite loop querying locations.
|
||||||
|
ExecutorService executor = Executors.newFixedThreadPool(1);
|
||||||
|
Future<?> future = executor.submit(new Callable<Void>() {
|
||||||
|
@Override
|
||||||
|
public Void call() throws IOException {
|
||||||
|
// read from 2nd block.
|
||||||
|
dis.readFully(blockSize, new byte[4]);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
try {
|
||||||
|
future.get(4, TimeUnit.SECONDS);
|
||||||
|
Assert.fail();
|
||||||
|
} catch (ExecutionException ee) {
|
||||||
|
assertTrue(ee.toString(), ee.getCause() instanceof EOFException);
|
||||||
|
} finally {
|
||||||
|
future.cancel(true);
|
||||||
|
executor.shutdown();
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public static void main(String[] args) throws Exception {
|
public static void main(String[] args) throws Exception {
|
||||||
new TestPread().testPreadDFS();
|
new TestPread().testPreadDFS();
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue