From 61fd1a74a1692db51815fa42f618b451c9946a0c Mon Sep 17 00:00:00 2001
From: Stephen O'Donnell <sodonnell@cloudera.com>
Date: Mon, 2 Sep 2019 09:46:14 -0700
Subject: [PATCH] HDFS-14706. Checksums are not checked if block meta file is
 less than 7 bytes. Contributed by Stephen O'Donnell.

Signed-off-by: Wei-Chiu Chuang <weichiu@apache.org>
(cherry picked from commit 915cbc91c0a12cc7b4d3ef4ea951941defbbcb33)
---
 .../org/apache/hadoop/util/DataChecksum.java  |  28 ++-
 .../server/datanode/BlockMetadataHeader.java  |  32 +++-
 .../datanode/CorruptMetaHeaderException.java  |  36 ++++
 .../hdfs/server/datanode/BlockSender.java     |  13 +-
 .../hadoop/hdfs/server/datanode/DataNode.java |   3 +-
 .../hdfs/server/datanode/DataXceiver.java     |  12 ++
 .../datanode/TestCorruptMetadataFile.java     | 165 ++++++++++++++++++
 7 files changed, 275 insertions(+), 14 deletions(-)
 create mode 100644 hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/CorruptMetaHeaderException.java
 create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestCorruptMetadataFile.java

diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java
index 06ef8acc523..32a0adca197 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java
@@ -143,9 +143,12 @@ public class DataChecksum implements Checksum {
    * Creates a DataChecksum from HEADER_LEN bytes from arr[offset].
    * @return DataChecksum of the type in the array or null in case of an error.
    */
-  public static DataChecksum newDataChecksum( byte bytes[], int offset ) {
+  public static DataChecksum newDataChecksum(byte[] bytes, int offset)
+      throws IOException {
     if (offset < 0 || bytes.length < offset + getChecksumHeaderSize()) {
-      return null;
+      throw new InvalidChecksumSizeException("Could not create DataChecksum "
+          + " from the byte array of length " + bytes.length
+          + " and offset "+ offset);
     }
     
     // like readInt():
@@ -153,7 +156,14 @@ public class DataChecksum implements Checksum {
                            ( (bytes[offset+2] & 0xff) << 16 ) |
                            ( (bytes[offset+3] & 0xff) << 8 )  |
                            ( (bytes[offset+4] & 0xff) );
-    return newDataChecksum( Type.valueOf(bytes[offset]), bytesPerChecksum );
+    DataChecksum csum = newDataChecksum(mapByteToChecksumType(bytes[offset]),
+        bytesPerChecksum);
+    if (csum == null) {
+      throw new InvalidChecksumSizeException(("Could not create DataChecksum "
+          + " from the byte array of length " + bytes.length
+          + " and bytesPerCheckSum of "+ bytesPerChecksum));
+    }
+    return csum;
   }
   
   /**
@@ -164,13 +174,23 @@ public class DataChecksum implements Checksum {
                                  throws IOException {
     int type = in.readByte();
     int bpc = in.readInt();
-    DataChecksum summer = newDataChecksum(Type.valueOf(type), bpc );
+    DataChecksum summer = newDataChecksum(mapByteToChecksumType(type), bpc);
     if ( summer == null ) {
       throw new InvalidChecksumSizeException("Could not create DataChecksum "
           + "of type " + type + " with bytesPerChecksum " + bpc);
     }
     return summer;
   }
+
+  private static Type mapByteToChecksumType(int type)
+      throws InvalidChecksumSizeException{
+    try {
+      return Type.valueOf(type);
+    } catch (IllegalArgumentException e) {
+      throw new InvalidChecksumSizeException("The value "+type+" does not map"+
+        " to a valid checksum Type");
+    }
+  }
   
   /**
    * Writes the checksum header to the output stream <i>out</i>.
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java
index 738f496c254..2d1cfc13552 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.util.DataChecksum;
 
 import com.google.common.annotations.VisibleForTesting;
 
+import org.apache.hadoop.util.InvalidChecksumSizeException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -119,13 +120,19 @@ public class BlockMetadataHeader {
     ByteBuffer buf = ByteBuffer.wrap(arr);
 
     while (buf.hasRemaining()) {
-      if (fc.read(buf, 0) <= 0) {
-        throw new EOFException("unexpected EOF while reading " +
-            "metadata file header");
+      if (fc.read(buf, buf.position()) <= 0) {
+        throw new CorruptMetaHeaderException("EOF while reading header from "+
+            "the metadata file. The meta file may be truncated or corrupt");
       }
     }
     short version = (short)((arr[0] << 8) | (arr[1] & 0xff));
-    DataChecksum dataChecksum = DataChecksum.newDataChecksum(arr, 2);
+    DataChecksum dataChecksum;
+    try {
+      dataChecksum = DataChecksum.newDataChecksum(arr, 2);
+    } catch (InvalidChecksumSizeException e) {
+      throw new CorruptMetaHeaderException("The block meta file header is "+
+          "corrupt", e);
+    }
     return new BlockMetadataHeader(version, dataChecksum);
   }
 
@@ -136,7 +143,14 @@ public class BlockMetadataHeader {
    */
   public static BlockMetadataHeader readHeader(DataInputStream in)
       throws IOException {
-    return readHeader(in.readShort(), in);
+    try {
+      return readHeader(in.readShort(), in);
+    } catch (EOFException eof) {
+      // The attempt to read the header threw EOF, indicating there are not
+      // enough bytes in the meta file for the header.
+      throw new CorruptMetaHeaderException("EOF while reading header from meta"+
+          ". The meta file may be truncated or corrupt", eof);
+    }
   }
 
   /**
@@ -170,7 +184,13 @@ public class BlockMetadataHeader {
   // Version is already read.
   private static BlockMetadataHeader readHeader(short version,
       DataInputStream in) throws IOException {
-    DataChecksum checksum = DataChecksum.newDataChecksum(in);
+    DataChecksum checksum = null;
+    try {
+      checksum = DataChecksum.newDataChecksum(in);
+    } catch (InvalidChecksumSizeException e) {
+      throw new CorruptMetaHeaderException("The block meta file header is "+
+          "corrupt", e);
+    }
     return new BlockMetadataHeader(version, checksum);
   }
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/CorruptMetaHeaderException.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/CorruptMetaHeaderException.java
new file mode 100644
index 00000000000..d6ea6ab00a4
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/CorruptMetaHeaderException.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.io.IOException;
+
+/**
+ * Exception object that is thrown when the block metadata file is corrupt.
+ */
+public class CorruptMetaHeaderException extends IOException {
+
+  CorruptMetaHeaderException(String msg) {
+    super(msg);
+  }
+
+  CorruptMetaHeaderException(String msg, Throwable cause) {
+    super(msg, cause);
+  }
+
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
index 517fcd756f8..2cb4100f431 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
@@ -325,13 +325,22 @@ class BlockSender implements java.io.Closeable {
             // storage.  The header is important for determining the checksum
             // type later when lazy persistence copies the block to non-transient
             // storage and computes the checksum.
+            int expectedHeaderSize = BlockMetadataHeader.getHeaderSize();
             if (!replica.isOnTransientStorage() &&
-                metaIn.getLength() >= BlockMetadataHeader.getHeaderSize()) {
+                metaIn.getLength() >= expectedHeaderSize) {
               checksumIn = new DataInputStream(new BufferedInputStream(
                   metaIn, IO_FILE_BUFFER_SIZE));
-  
+
               csum = BlockMetadataHeader.readDataChecksum(checksumIn, block);
               keepMetaInOpen = true;
+            } else if (!replica.isOnTransientStorage() &&
+                metaIn.getLength() < expectedHeaderSize) {
+              LOG.warn("The meta file length {} is less than the expected " +
+                  "header length {}, indicating the meta file is corrupt",
+                  metaIn.getLength(), expectedHeaderSize);
+              throw new CorruptMetaHeaderException("The meta file length "+
+                  metaIn.getLength()+" is less than the expected length "+
+                  expectedHeaderSize);
             }
           } else {
             LOG.warn("Could not find metadata file for " + block);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index 9daffbfa8c5..73b762407ad 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -207,7 +207,6 @@ import org.apache.hadoop.tracing.TracerConfigurationManager;
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import org.apache.hadoop.util.GenericOptionsParser;
-import org.apache.hadoop.util.InvalidChecksumSizeException;
 import org.apache.hadoop.util.JvmPauseMonitor;
 import org.apache.hadoop.util.ServicePlugin;
 import org.apache.hadoop.util.StringUtils;
@@ -3438,7 +3437,7 @@ public class DataNode extends ReconfigurableBase
   void handleBadBlock(ExtendedBlock block, IOException e, boolean fromScanner) {
 
     boolean isBadBlock = fromScanner || (e instanceof DiskFileCorruptException
-        || e instanceof InvalidChecksumSizeException);
+        || e instanceof CorruptMetaHeaderException);
 
     if (!isBadBlock) {
       return;
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
index 55849f82857..3b049841fb9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
@@ -647,6 +647,12 @@ class DataXceiver extends Receiver implements Runnable {
             dnR, block, remoteAddress, ioe);
         incrDatanodeNetworkErrors();
       }
+      // Normally the client reports a bad block to the NN. However if the
+      // meta file is corrupt or an disk error occurs (EIO), then the client
+      // never gets a chance to do validation, and hence will never report
+      // the block as bad. For some classes of IO exception, the DN should
+      // report the block as bad, via the handleBadBlock() method
+      datanode.handleBadBlock(block, ioe, false);
       throw ioe;
     } finally {
       IOUtils.closeStream(blockSender);
@@ -1118,6 +1124,12 @@ class DataXceiver extends Receiver implements Runnable {
       isOpSuccess = false;
       LOG.info("opCopyBlock {} received exception {}", block, ioe.toString());
       incrDatanodeNetworkErrors();
+      // Normally the client reports a bad block to the NN. However if the
+      // meta file is corrupt or an disk error occurs (EIO), then the client
+      // never gets a chance to do validation, and hence will never report
+      // the block as bad. For some classes of IO exception, the DN should
+      // report the block as bad, via the handleBadBlock() method
+      datanode.handleBadBlock(block, ioe, false);
       throw ioe;
     } finally {
       dataXceiverServer.balanceThrottler.release();
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestCorruptMetadataFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestCorruptMetadataFile.java
new file mode 100644
index 00000000000..ccd146d0f6b
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestCorruptMetadataFile.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import com.google.common.base.Supplier;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.*;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.test.LambdaTestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.RandomAccessFile;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests to ensure that a block is not read successfully from a datanode
+ * when it has a corrupt metadata file.
+ */
+public class TestCorruptMetadataFile {
+
+  private MiniDFSCluster cluster;
+  private MiniDFSCluster.Builder clusterBuilder;
+  private Configuration conf;
+
+  @Before
+  public void setUp() throws Exception {
+    conf = new HdfsConfiguration();
+    // Reduce block acquire retries as we only have 1 DN and it allows the
+    // test to run faster
+    conf.setInt(
+        HdfsClientConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY, 1);
+    clusterBuilder = new MiniDFSCluster.Builder(conf).numDataNodes(1);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    if (cluster != null) {
+      cluster.shutdown();
+      cluster = null;
+    }
+  }
+
+  @Test(timeout=60000)
+  public void testReadBlockFailsWhenMetaIsCorrupt() throws Exception {
+    cluster = clusterBuilder.build();
+    cluster.waitActive();
+    FileSystem fs = cluster.getFileSystem();
+    DataNode dn0 = cluster.getDataNodes().get(0);
+    Path filePath = new Path("test.dat");
+    FSDataOutputStream out = fs.create(filePath, (short) 1);
+    out.write(1);
+    out.hflush();
+    out.close();
+
+    ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, filePath);
+    File metadataFile = cluster.getBlockMetadataFile(0, block);
+
+    // First ensure we can read the file OK
+    FSDataInputStream in = fs.open(filePath);
+    in.readByte();
+    in.close();
+
+    // Now truncate the meta file, and ensure the data is not read OK
+    RandomAccessFile raFile = new RandomAccessFile(metadataFile, "rw");
+    raFile.setLength(0);
+
+    FSDataInputStream intrunc = fs.open(filePath);
+    LambdaTestUtils.intercept(BlockMissingException.class,
+        () -> intrunc.readByte());
+    intrunc.close();
+
+    // Write 11 bytes to the file, but an invalid header
+    raFile.write("12345678901".getBytes());
+    assertEquals(11, raFile.length());
+
+    FSDataInputStream ininvalid = fs.open(filePath);
+    LambdaTestUtils.intercept(BlockMissingException.class,
+        () -> ininvalid.readByte());
+    ininvalid.close();
+
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        return cluster.getNameNode().getNamesystem()
+            .getBlockManager().getCorruptBlocks() == 1;
+      }
+    }, 100, 5000);
+
+    raFile.close();
+  }
+
+  /**
+   * This test create a sample block meta file and then attempts to load it
+   * using BlockMetadataHeader to ensure it can load a valid file and that it
+   * throws a CorruptMetaHeaderException when the header is invalid.
+   * @throws Exception
+   */
+  @Test
+  public void testBlockMetaDataHeaderPReadHandlesCorruptMetaFile()
+      throws Exception {
+    File testDir = GenericTestUtils.getTestDir();
+    RandomAccessFile raFile = new RandomAccessFile(
+        new File(testDir, "metafile"), "rw");
+
+    // Write a valid header into the file
+    // Version
+    raFile.writeShort((short)1);
+    // Checksum type
+    raFile.writeByte(1);
+    // Bytes per checksum
+    raFile.writeInt(512);
+    // We should be able to get the header with no exceptions
+    BlockMetadataHeader header =
+        BlockMetadataHeader.preadHeader(raFile.getChannel());
+
+    // Now truncate the meta file to zero and ensure an exception is raised
+    raFile.setLength(0);
+    LambdaTestUtils.intercept(CorruptMetaHeaderException.class,
+        () -> BlockMetadataHeader.preadHeader(raFile.getChannel()));
+
+    // Now write a partial valid header to sure an exception is thrown
+    // if the header cannot be fully read
+    // Version
+    raFile.writeShort((short)1);
+    // Checksum type
+    raFile.writeByte(1);
+
+    LambdaTestUtils.intercept(CorruptMetaHeaderException.class,
+        () -> BlockMetadataHeader.preadHeader(raFile.getChannel()));
+
+    // Finally write the expected 7 bytes, but invalid data
+    raFile.setLength(0);
+    raFile.write("1234567".getBytes());
+
+    LambdaTestUtils.intercept(CorruptMetaHeaderException.class,
+        () -> BlockMetadataHeader.preadHeader(raFile.getChannel()));
+
+    raFile.close();
+  }
+}