HDFS-14706. Checksums are not checked if block meta file is less than 7 bytes. Contributed by Stephen O'Donnell.

Signed-off-by: Wei-Chiu Chuang <weichiu@apache.org>
(cherry picked from commit 915cbc91c0)
This commit is contained in:
Stephen O'Donnell 2019-09-02 09:46:14 -07:00 committed by Wei-Chiu Chuang
parent 84b2fe4b10
commit 61fd1a74a1
7 changed files with 275 additions and 14 deletions

View File

@ -143,9 +143,12 @@ public class DataChecksum implements Checksum {
* Creates a DataChecksum from HEADER_LEN bytes from arr[offset]. * Creates a DataChecksum from HEADER_LEN bytes from arr[offset].
* @return DataChecksum of the type in the array or null in case of an error. * @return DataChecksum of the type in the array or null in case of an error.
*/ */
public static DataChecksum newDataChecksum( byte bytes[], int offset ) { public static DataChecksum newDataChecksum(byte[] bytes, int offset)
throws IOException {
if (offset < 0 || bytes.length < offset + getChecksumHeaderSize()) { if (offset < 0 || bytes.length < offset + getChecksumHeaderSize()) {
return null; throw new InvalidChecksumSizeException("Could not create DataChecksum "
+ " from the byte array of length " + bytes.length
+ " and offset "+ offset);
} }
// like readInt(): // like readInt():
@ -153,7 +156,14 @@ public class DataChecksum implements Checksum {
( (bytes[offset+2] & 0xff) << 16 ) | ( (bytes[offset+2] & 0xff) << 16 ) |
( (bytes[offset+3] & 0xff) << 8 ) | ( (bytes[offset+3] & 0xff) << 8 ) |
( (bytes[offset+4] & 0xff) ); ( (bytes[offset+4] & 0xff) );
return newDataChecksum( Type.valueOf(bytes[offset]), bytesPerChecksum ); DataChecksum csum = newDataChecksum(mapByteToChecksumType(bytes[offset]),
bytesPerChecksum);
if (csum == null) {
throw new InvalidChecksumSizeException(("Could not create DataChecksum "
+ " from the byte array of length " + bytes.length
+ " and bytesPerCheckSum of "+ bytesPerChecksum));
}
return csum;
} }
/** /**
@ -164,7 +174,7 @@ public class DataChecksum implements Checksum {
throws IOException { throws IOException {
int type = in.readByte(); int type = in.readByte();
int bpc = in.readInt(); int bpc = in.readInt();
DataChecksum summer = newDataChecksum(Type.valueOf(type), bpc ); DataChecksum summer = newDataChecksum(mapByteToChecksumType(type), bpc);
if ( summer == null ) { if ( summer == null ) {
throw new InvalidChecksumSizeException("Could not create DataChecksum " throw new InvalidChecksumSizeException("Could not create DataChecksum "
+ "of type " + type + " with bytesPerChecksum " + bpc); + "of type " + type + " with bytesPerChecksum " + bpc);
@ -172,6 +182,16 @@ public class DataChecksum implements Checksum {
return summer; return summer;
} }
private static Type mapByteToChecksumType(int type)
throws InvalidChecksumSizeException{
try {
return Type.valueOf(type);
} catch (IllegalArgumentException e) {
throw new InvalidChecksumSizeException("The value "+type+" does not map"+
" to a valid checksum Type");
}
}
/** /**
* Writes the checksum header to the output stream <i>out</i>. * Writes the checksum header to the output stream <i>out</i>.
*/ */

View File

@ -35,6 +35,7 @@ import org.apache.hadoop.util.DataChecksum;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.util.InvalidChecksumSizeException;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -119,13 +120,19 @@ public class BlockMetadataHeader {
ByteBuffer buf = ByteBuffer.wrap(arr); ByteBuffer buf = ByteBuffer.wrap(arr);
while (buf.hasRemaining()) { while (buf.hasRemaining()) {
if (fc.read(buf, 0) <= 0) { if (fc.read(buf, buf.position()) <= 0) {
throw new EOFException("unexpected EOF while reading " + throw new CorruptMetaHeaderException("EOF while reading header from "+
"metadata file header"); "the metadata file. The meta file may be truncated or corrupt");
} }
} }
short version = (short)((arr[0] << 8) | (arr[1] & 0xff)); short version = (short)((arr[0] << 8) | (arr[1] & 0xff));
DataChecksum dataChecksum = DataChecksum.newDataChecksum(arr, 2); DataChecksum dataChecksum;
try {
dataChecksum = DataChecksum.newDataChecksum(arr, 2);
} catch (InvalidChecksumSizeException e) {
throw new CorruptMetaHeaderException("The block meta file header is "+
"corrupt", e);
}
return new BlockMetadataHeader(version, dataChecksum); return new BlockMetadataHeader(version, dataChecksum);
} }
@ -136,7 +143,14 @@ public class BlockMetadataHeader {
*/ */
public static BlockMetadataHeader readHeader(DataInputStream in) public static BlockMetadataHeader readHeader(DataInputStream in)
throws IOException { throws IOException {
return readHeader(in.readShort(), in); try {
return readHeader(in.readShort(), in);
} catch (EOFException eof) {
// The attempt to read the header threw EOF, indicating there are not
// enough bytes in the meta file for the header.
throw new CorruptMetaHeaderException("EOF while reading header from meta"+
". The meta file may be truncated or corrupt", eof);
}
} }
/** /**
@ -170,7 +184,13 @@ public class BlockMetadataHeader {
// Version is already read. // Version is already read.
private static BlockMetadataHeader readHeader(short version, private static BlockMetadataHeader readHeader(short version,
DataInputStream in) throws IOException { DataInputStream in) throws IOException {
DataChecksum checksum = DataChecksum.newDataChecksum(in); DataChecksum checksum = null;
try {
checksum = DataChecksum.newDataChecksum(in);
} catch (InvalidChecksumSizeException e) {
throw new CorruptMetaHeaderException("The block meta file header is "+
"corrupt", e);
}
return new BlockMetadataHeader(version, checksum); return new BlockMetadataHeader(version, checksum);
} }

View File

@ -0,0 +1,36 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode;
import java.io.IOException;
/**
* Exception object that is thrown when the block metadata file is corrupt.
*/
public class CorruptMetaHeaderException extends IOException {
CorruptMetaHeaderException(String msg) {
super(msg);
}
CorruptMetaHeaderException(String msg, Throwable cause) {
super(msg, cause);
}
}

View File

@ -325,13 +325,22 @@ class BlockSender implements java.io.Closeable {
// storage. The header is important for determining the checksum // storage. The header is important for determining the checksum
// type later when lazy persistence copies the block to non-transient // type later when lazy persistence copies the block to non-transient
// storage and computes the checksum. // storage and computes the checksum.
int expectedHeaderSize = BlockMetadataHeader.getHeaderSize();
if (!replica.isOnTransientStorage() && if (!replica.isOnTransientStorage() &&
metaIn.getLength() >= BlockMetadataHeader.getHeaderSize()) { metaIn.getLength() >= expectedHeaderSize) {
checksumIn = new DataInputStream(new BufferedInputStream( checksumIn = new DataInputStream(new BufferedInputStream(
metaIn, IO_FILE_BUFFER_SIZE)); metaIn, IO_FILE_BUFFER_SIZE));
csum = BlockMetadataHeader.readDataChecksum(checksumIn, block); csum = BlockMetadataHeader.readDataChecksum(checksumIn, block);
keepMetaInOpen = true; keepMetaInOpen = true;
} else if (!replica.isOnTransientStorage() &&
metaIn.getLength() < expectedHeaderSize) {
LOG.warn("The meta file length {} is less than the expected " +
"header length {}, indicating the meta file is corrupt",
metaIn.getLength(), expectedHeaderSize);
throw new CorruptMetaHeaderException("The meta file length "+
metaIn.getLength()+" is less than the expected length "+
expectedHeaderSize);
} }
} else { } else {
LOG.warn("Could not find metadata file for " + block); LOG.warn("Could not find metadata file for " + block);

View File

@ -207,7 +207,6 @@ import org.apache.hadoop.tracing.TracerConfigurationManager;
import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.DiskChecker.DiskErrorException; import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.InvalidChecksumSizeException;
import org.apache.hadoop.util.JvmPauseMonitor; import org.apache.hadoop.util.JvmPauseMonitor;
import org.apache.hadoop.util.ServicePlugin; import org.apache.hadoop.util.ServicePlugin;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
@ -3438,7 +3437,7 @@ public class DataNode extends ReconfigurableBase
void handleBadBlock(ExtendedBlock block, IOException e, boolean fromScanner) { void handleBadBlock(ExtendedBlock block, IOException e, boolean fromScanner) {
boolean isBadBlock = fromScanner || (e instanceof DiskFileCorruptException boolean isBadBlock = fromScanner || (e instanceof DiskFileCorruptException
|| e instanceof InvalidChecksumSizeException); || e instanceof CorruptMetaHeaderException);
if (!isBadBlock) { if (!isBadBlock) {
return; return;

View File

@ -647,6 +647,12 @@ class DataXceiver extends Receiver implements Runnable {
dnR, block, remoteAddress, ioe); dnR, block, remoteAddress, ioe);
incrDatanodeNetworkErrors(); incrDatanodeNetworkErrors();
} }
// Normally the client reports a bad block to the NN. However if the
// meta file is corrupt or an disk error occurs (EIO), then the client
// never gets a chance to do validation, and hence will never report
// the block as bad. For some classes of IO exception, the DN should
// report the block as bad, via the handleBadBlock() method
datanode.handleBadBlock(block, ioe, false);
throw ioe; throw ioe;
} finally { } finally {
IOUtils.closeStream(blockSender); IOUtils.closeStream(blockSender);
@ -1118,6 +1124,12 @@ class DataXceiver extends Receiver implements Runnable {
isOpSuccess = false; isOpSuccess = false;
LOG.info("opCopyBlock {} received exception {}", block, ioe.toString()); LOG.info("opCopyBlock {} received exception {}", block, ioe.toString());
incrDatanodeNetworkErrors(); incrDatanodeNetworkErrors();
// Normally the client reports a bad block to the NN. However if the
// meta file is corrupt or an disk error occurs (EIO), then the client
// never gets a chance to do validation, and hence will never report
// the block as bad. For some classes of IO exception, the DN should
// report the block as bad, via the handleBadBlock() method
datanode.handleBadBlock(block, ioe, false);
throw ioe; throw ioe;
} finally { } finally {
dataXceiverServer.balanceThrottler.release(); dataXceiverServer.balanceThrottler.release();

View File

@ -0,0 +1,165 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode;
import com.google.common.base.Supplier;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.*;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.LambdaTestUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.io.RandomAccessFile;
import static org.junit.Assert.assertEquals;
/**
* Tests to ensure that a block is not read successfully from a datanode
* when it has a corrupt metadata file.
*/
public class TestCorruptMetadataFile {
private MiniDFSCluster cluster;
private MiniDFSCluster.Builder clusterBuilder;
private Configuration conf;
@Before
public void setUp() throws Exception {
conf = new HdfsConfiguration();
// Reduce block acquire retries as we only have 1 DN and it allows the
// test to run faster
conf.setInt(
HdfsClientConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY, 1);
clusterBuilder = new MiniDFSCluster.Builder(conf).numDataNodes(1);
}
@After
public void tearDown() throws Exception {
if (cluster != null) {
cluster.shutdown();
cluster = null;
}
}
@Test(timeout=60000)
public void testReadBlockFailsWhenMetaIsCorrupt() throws Exception {
cluster = clusterBuilder.build();
cluster.waitActive();
FileSystem fs = cluster.getFileSystem();
DataNode dn0 = cluster.getDataNodes().get(0);
Path filePath = new Path("test.dat");
FSDataOutputStream out = fs.create(filePath, (short) 1);
out.write(1);
out.hflush();
out.close();
ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, filePath);
File metadataFile = cluster.getBlockMetadataFile(0, block);
// First ensure we can read the file OK
FSDataInputStream in = fs.open(filePath);
in.readByte();
in.close();
// Now truncate the meta file, and ensure the data is not read OK
RandomAccessFile raFile = new RandomAccessFile(metadataFile, "rw");
raFile.setLength(0);
FSDataInputStream intrunc = fs.open(filePath);
LambdaTestUtils.intercept(BlockMissingException.class,
() -> intrunc.readByte());
intrunc.close();
// Write 11 bytes to the file, but an invalid header
raFile.write("12345678901".getBytes());
assertEquals(11, raFile.length());
FSDataInputStream ininvalid = fs.open(filePath);
LambdaTestUtils.intercept(BlockMissingException.class,
() -> ininvalid.readByte());
ininvalid.close();
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
return cluster.getNameNode().getNamesystem()
.getBlockManager().getCorruptBlocks() == 1;
}
}, 100, 5000);
raFile.close();
}
/**
* This test create a sample block meta file and then attempts to load it
* using BlockMetadataHeader to ensure it can load a valid file and that it
* throws a CorruptMetaHeaderException when the header is invalid.
* @throws Exception
*/
@Test
public void testBlockMetaDataHeaderPReadHandlesCorruptMetaFile()
throws Exception {
File testDir = GenericTestUtils.getTestDir();
RandomAccessFile raFile = new RandomAccessFile(
new File(testDir, "metafile"), "rw");
// Write a valid header into the file
// Version
raFile.writeShort((short)1);
// Checksum type
raFile.writeByte(1);
// Bytes per checksum
raFile.writeInt(512);
// We should be able to get the header with no exceptions
BlockMetadataHeader header =
BlockMetadataHeader.preadHeader(raFile.getChannel());
// Now truncate the meta file to zero and ensure an exception is raised
raFile.setLength(0);
LambdaTestUtils.intercept(CorruptMetaHeaderException.class,
() -> BlockMetadataHeader.preadHeader(raFile.getChannel()));
// Now write a partial valid header to sure an exception is thrown
// if the header cannot be fully read
// Version
raFile.writeShort((short)1);
// Checksum type
raFile.writeByte(1);
LambdaTestUtils.intercept(CorruptMetaHeaderException.class,
() -> BlockMetadataHeader.preadHeader(raFile.getChannel()));
// Finally write the expected 7 bytes, but invalid data
raFile.setLength(0);
raFile.write("1234567".getBytes());
LambdaTestUtils.intercept(CorruptMetaHeaderException.class,
() -> BlockMetadataHeader.preadHeader(raFile.getChannel()));
raFile.close();
}
}