HDFS-7744. Fix potential NPE in DFSInputStream after setDropBehind or setReadahead is called (cmccabe)

This commit is contained in:
Colin Patrick Mccabe 2015-02-09 20:16:41 -08:00
parent 260b5e32c4
commit a9dc5cd706
3 changed files with 36 additions and 8 deletions

View File

@ -890,6 +890,9 @@ Release 2.7.0 - UNRELEASED
HDFS-7756. Restore method signature for LocatedBlock#getLocations(). (Ted
Yu via yliu)
HDFS-7744. Fix potential NPE in DFSInputStream after setDropBehind or
setReadahead is called (cmccabe)
Release 2.6.1 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -580,10 +580,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
}
// Will be getting a new BlockReader.
if (blockReader != null) {
blockReader.close();
blockReader = null;
}
closeCurrentBlockReader();
//
// Connect to best DataNode for desired Block, with potential offset
@ -686,10 +683,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
"unreleased ByteBuffers allocated by read(). " +
"Please release " + builder.toString() + ".");
}
if (blockReader != null) {
blockReader.close();
blockReader = null;
}
closeCurrentBlockReader();
super.close();
}
@ -1649,6 +1643,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
DFSClient.LOG.error("error closing blockReader", e);
}
blockReader = null;
blockEnd = -1;
}
@Override

View File

@ -369,4 +369,34 @@ public class TestCachingStrategy {
}
}
}
@Test(timeout=120000)
public void testSeekAfterSetDropBehind() throws Exception {
// start a cluster
LOG.info("testSeekAfterSetDropBehind");
Configuration conf = new HdfsConfiguration();
MiniDFSCluster cluster = null;
String TEST_PATH = "/test";
int TEST_PATH_LEN = MAX_TEST_FILE_LEN;
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1)
.build();
cluster.waitActive();
FileSystem fs = cluster.getFileSystem();
createHdfsFile(fs, new Path(TEST_PATH), TEST_PATH_LEN, false);
// verify that we can seek after setDropBehind
FSDataInputStream fis = fs.open(new Path(TEST_PATH));
try {
Assert.assertTrue(fis.read() != -1); // create BlockReader
fis.setDropBehind(false); // clear BlockReader
fis.seek(2); // seek
} finally {
fis.close();
}
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
}