HDFS-3067. NPE in DFSInputStream.readBuffer if read is repeated on corrupted block. Contributed by Henry Robinson.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1301182 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
1177d4edc2
commit
6d96a28a08
|
@ -87,7 +87,10 @@ Trunk (unreleased changes)
|
||||||
HDFS-3037. TestMulitipleNNDataBlockScanner#testBlockScannerAfterRestart is
|
HDFS-3037. TestMulitipleNNDataBlockScanner#testBlockScannerAfterRestart is
|
||||||
racy. (atm)
|
racy. (atm)
|
||||||
|
|
||||||
HDFS-2966 TestNameNodeMetrics tests can fail under load. (stevel)
|
HDFS-2966. TestNameNodeMetrics tests can fail under load. (stevel)
|
||||||
|
|
||||||
|
HDFS-3067. NPE in DFSInputStream.readBuffer if read is repeated on
|
||||||
|
corrupted block. (Henry Robinson via atm)
|
||||||
|
|
||||||
Release 0.23.3 - UNRELEASED
|
Release 0.23.3 - UNRELEASED
|
||||||
|
|
||||||
|
|
|
@ -538,7 +538,9 @@ public class DFSInputStream extends FSInputStream {
|
||||||
int retries = 2;
|
int retries = 2;
|
||||||
while (retries > 0) {
|
while (retries > 0) {
|
||||||
try {
|
try {
|
||||||
if (pos > blockEnd) {
|
// currentNode can be left as null if previous read had a checksum
|
||||||
|
// error on the same block. See HDFS-3067
|
||||||
|
if (pos > blockEnd || currentNode == null) {
|
||||||
currentNode = blockSeekTo(pos);
|
currentNode = blockSeekTo(pos);
|
||||||
}
|
}
|
||||||
int realLen = (int) Math.min(len, (blockEnd - pos + 1L));
|
int realLen = (int) Math.min(len, (blockEnd - pos + 1L));
|
||||||
|
|
|
@ -48,6 +48,7 @@ import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
import org.apache.hadoop.fs.FileChecksum;
|
import org.apache.hadoop.fs.FileChecksum;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.fs.UnresolvedLinkException;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
|
@ -64,6 +65,7 @@ import org.apache.hadoop.ipc.RPC;
|
||||||
import org.apache.hadoop.ipc.Server;
|
import org.apache.hadoop.ipc.Server;
|
||||||
import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
|
import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
|
||||||
import org.apache.hadoop.net.NetUtils;
|
import org.apache.hadoop.net.NetUtils;
|
||||||
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.mockito.internal.stubbing.answers.ThrowsException;
|
import org.mockito.internal.stubbing.answers.ThrowsException;
|
||||||
import org.mockito.invocation.InvocationOnMock;
|
import org.mockito.invocation.InvocationOnMock;
|
||||||
import org.mockito.stubbing.Answer;
|
import org.mockito.stubbing.Answer;
|
||||||
|
@ -649,5 +651,52 @@ public class TestDFSClientRetries extends TestCase {
|
||||||
server.stop();
|
server.stop();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test that checksum failures are recovered from by the next read on the same
|
||||||
|
* DFSInputStream. Corruption information is not persisted from read call to
|
||||||
|
* read call, so the client should expect consecutive calls to behave the same
|
||||||
|
* way. See HDFS-3067.
|
||||||
|
*/
|
||||||
|
public void testRetryOnChecksumFailure()
|
||||||
|
throws UnresolvedLinkException, IOException {
|
||||||
|
HdfsConfiguration conf = new HdfsConfiguration();
|
||||||
|
MiniDFSCluster cluster =
|
||||||
|
new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
|
||||||
|
|
||||||
|
try {
|
||||||
|
final short REPL_FACTOR = 1;
|
||||||
|
final long FILE_LENGTH = 512L;
|
||||||
|
cluster.waitActive();
|
||||||
|
FileSystem fs = cluster.getFileSystem();
|
||||||
|
|
||||||
|
Path path = new Path("/corrupted");
|
||||||
|
|
||||||
|
DFSTestUtil.createFile(fs, path, FILE_LENGTH, REPL_FACTOR, 12345L);
|
||||||
|
DFSTestUtil.waitReplication(fs, path, REPL_FACTOR);
|
||||||
|
|
||||||
|
ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, path);
|
||||||
|
int blockFilesCorrupted = cluster.corruptBlockOnDataNodes(block);
|
||||||
|
assertEquals("All replicas not corrupted", REPL_FACTOR,
|
||||||
|
blockFilesCorrupted);
|
||||||
|
|
||||||
|
InetSocketAddress nnAddr =
|
||||||
|
new InetSocketAddress("localhost", cluster.getNameNodePort());
|
||||||
|
DFSClient client = new DFSClient(nnAddr, conf);
|
||||||
|
DFSInputStream dis = client.open(path.toString());
|
||||||
|
byte[] arr = new byte[(int)FILE_LENGTH];
|
||||||
|
for (int i = 0; i < 2; ++i) {
|
||||||
|
try {
|
||||||
|
dis.read(arr, 0, (int)FILE_LENGTH);
|
||||||
|
fail("Expected ChecksumException not thrown");
|
||||||
|
} catch (Exception ex) {
|
||||||
|
GenericTestUtils.assertExceptionContains(
|
||||||
|
"Checksum error", ex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue