diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java index 74c1025171f..a308b49f175 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java @@ -183,6 +183,8 @@ class BlockSender implements java.io.Closeable { // would risk sending too much unnecessary data. 512 (1 disk sector) // is likely to result in minimal extra IO. private static final long CHUNK_SIZE = 512; + + private static final String EIO_ERROR = "Input/output error"; /** * Constructor * @@ -576,7 +578,14 @@ class BlockSender implements java.io.Closeable { int dataOff = checksumOff + checksumDataLen; if (!transferTo) { // normal transfer - ris.readDataFully(buf, dataOff, dataLen); + try { + ris.readDataFully(buf, dataOff, dataLen); + } catch (IOException ioe) { + if (ioe.getMessage().startsWith(EIO_ERROR)) { + throw new DiskFileCorruptException("A disk IO error occurred", ioe); + } + throw ioe; + } if (verifyChecksum) { verifyChecksum(buf, dataOff, dataLen, numChunks, checksumOff); @@ -623,6 +632,13 @@ class BlockSender implements java.io.Closeable { * It was done here because the NIO throws an IOException for EPIPE. */ String ioem = e.getMessage(); + /* + * If we got an EIO when reading files or transferTo the client socket, + * it's very likely caused by bad disk track or other file corruptions. + */ + if (ioem.startsWith(EIO_ERROR)) { + throw new DiskFileCorruptException("A disk IO error occurred", e); + } if (!ioem.startsWith("Broken pipe") && !ioem.startsWith("Connection reset")) { LOG.error("BlockSender.sendChunks() exception: ", e); datanode.getBlockScanner().markSuspectBlock( diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index c8f6896820d..e620a2b6883 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -2610,13 +2610,7 @@ public class DataNode extends ReconfigurableBase metrics.incrBlocksReplicated(); } } catch (IOException ie) { - if (ie instanceof InvalidChecksumSizeException) { - // Add the block to the front of the scanning queue if metadata file - // is corrupt. We already add the block to front of scanner if the - // peer disconnects. - LOG.info("Adding block: {} for scanning", b); - blockScanner.markSuspectBlock(data.getVolume(b).getStorageID(), b); - } + handleBadBlock(b, ie, false); LOG.warn("{}:Failed to transfer {} to {} got", bpReg, b, targets[0], ie); } finally { @@ -3462,6 +3456,41 @@ public class DataNode extends ReconfigurableBase handleDiskError(sb.toString()); } + /** + * A bad block need to be handled, either to add to blockScanner suspect queue + * or report to NameNode directly. + * + * If the method is called by scanner, then the block must be a bad block, we + * report it to NameNode directly. Otherwise if we judge it as a bad block + * according to exception type, then we try to add the bad block to + * blockScanner suspect queue if blockScanner is enabled, or report to + * NameNode directly otherwise. + * + * @param block The suspicious block + * @param e The exception encountered when accessing the block + * @param fromScanner Is it from blockScanner. The blockScanner will call this + * method only when it's sure that the block is corrupt. + */ + void handleBadBlock(ExtendedBlock block, IOException e, boolean fromScanner) { + + boolean isBadBlock = fromScanner || (e instanceof DiskFileCorruptException + || e instanceof InvalidChecksumSizeException); + + if (!isBadBlock) { + return; + } + if (!fromScanner && blockScanner.isEnabled()) { + blockScanner.markSuspectBlock(data.getVolume(block).getStorageID(), + block); + } else { + try { + reportBadBlocks(block); + } catch (IOException ie) { + LOG.warn("report bad block {} failed", block, ie); + } + } + } + @VisibleForTesting public long getLastDiskErrorCheck() { return lastDiskErrorCheck; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskFileCorruptException.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskFileCorruptException.java new file mode 100644 index 00000000000..1a70fe78ea7 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskFileCorruptException.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode; + +import java.io.IOException; + +/** + * When kernel report a "Input/output error", we use this exception to + * represents some corruption(e.g. bad disk track) happened on some disk file. + */ +public class DiskFileCorruptException extends IOException { + /** + * Instantiate. + * @param msg the exception message + * @param cause the underlying cause + */ + public DiskFileCorruptException(String msg, Throwable cause) { + super(msg, cause); + } + + public DiskFileCorruptException(String msg) { + super(msg); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java index 3bdb1d7158a..84cfb04801d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java @@ -290,12 +290,7 @@ public class VolumeScanner extends Thread { return; } LOG.warn("Reporting bad {} on {}", block, volume); - try { - scanner.datanode.reportBadBlocks(block, volume); - } catch (IOException ie) { - // This is bad, but not bad enough to shut down the scanner. - LOG.warn("Cannot report bad block " + block, ie); - } + scanner.datanode.handleBadBlock(block, e, true); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReplication.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReplication.java index 7e51ed958c4..6af71067370 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReplication.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReplication.java @@ -27,6 +27,7 @@ import static org.mockito.ArgumentMatchers.any; import com.google.common.base.Supplier; import java.io.IOException; +import java.io.InputStream; import java.io.OutputStream; import java.net.InetSocketAddress; import java.util.ArrayList; @@ -53,7 +54,9 @@ import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil; import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.datanode.DataStorage; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.FsDatasetTestUtils.MaterializedReplica; import org.apache.hadoop.hdfs.server.datanode.InternalDataNodeTestUtils; import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; @@ -154,6 +157,67 @@ public class TestReplication { assertTrue(!fileSys.exists(name)); } + private static class CorruptFileSimulatedFSDataset + extends SimulatedFSDataset { + /** + * Simulated input and output streams. + * + */ + static private class CorruptFileSimulatedInputStream + extends java.io.InputStream { + private InputStream inputStream; + + CorruptFileSimulatedInputStream(InputStream is) { + inputStream = is; + } + + @Override + public int read() throws IOException { + int ret = inputStream.read(); + if (ret > 0) { + throw new IOException("Input/output error"); + } + return ret; + } + + @Override + public int read(byte[] b) throws IOException { + int ret = inputStream.read(b); + if (ret > 0) { + throw new IOException("Input/output error"); + } + return ret; + } + } + + CorruptFileSimulatedFSDataset(DataNode datanode, DataStorage storage, + Configuration conf) { + super(storage, conf); + } + + @Override + public synchronized InputStream getBlockInputStream(ExtendedBlock b, + long seekOffset) throws IOException { + InputStream result = super.getBlockInputStream(b); + IOUtils.skipFully(result, seekOffset); + return new CorruptFileSimulatedInputStream(result); + } + + static class Factory + extends FsDatasetSpi.Factory { + @Override + public CorruptFileSimulatedFSDataset newInstance(DataNode datanode, + DataStorage storage, Configuration conf) throws IOException { + return new CorruptFileSimulatedFSDataset(datanode, storage, conf); + } + + @Override + public boolean isSimulated() { + return true; + } + } + } + private void testBadBlockReportOnTransfer( boolean corruptBlockByDeletingBlockFile) throws Exception { Configuration conf = new HdfsConfiguration(); @@ -205,6 +269,53 @@ public class TestReplication { cluster.shutdown(); } + @Test(timeout = 30000) + public void testBadBlockReportOnTransferCorruptFile() throws Exception { + Configuration conf = new HdfsConfiguration(); + conf.set(DFSConfigKeys.DFS_DATANODE_FSDATASET_FACTORY_KEY, + CorruptFileSimulatedFSDataset.Factory.class.getName()); + // Disable BlockScanner to trigger reportBadBlocks + conf.setLong(DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, -1L); + FileSystem fs; + int replicaCount = 0; + short replFactor = 1; + MiniDFSCluster cluster = + new MiniDFSCluster.Builder(conf).numDataNodes(2).build(); + cluster.waitActive(); + try { + fs = cluster.getFileSystem(); + final DFSClient dfsClient = new DFSClient( + new InetSocketAddress("localhost", cluster.getNameNodePort()), conf); + + // Create file with replication factor of 1 + Path file1 = new Path("/tmp/testBadBlockReportOnTransfer/file1"); + DFSTestUtil.createFile(fs, file1, 1024, replFactor, 0); + DFSTestUtil.waitReplication(fs, file1, replFactor); + + // Increase replication factor, this should invoke transfer request + // Receiving datanode fails on checksum and reports it to namenode + replFactor = 2; + fs.setReplication(file1, replFactor); + + // Now get block details and check if the block is corrupt + GenericTestUtils.waitFor(() -> { + try { + return dfsClient.getNamenode() + .getBlockLocations(file1.toString(), 0, Long.MAX_VALUE).get(0) + .isCorrupt(); + } catch (IOException ie) { + return false; + } + }, 1000, 15000); + replicaCount = dfsClient.getNamenode() + .getBlockLocations(file1.toString(), 0, Long.MAX_VALUE).get(0) + .getLocations().length; + assertEquals("replication should not success", 1, replicaCount); + } finally { + cluster.shutdown(); + } + } + /* * Test if Datanode reports bad blocks during replication request */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java index c3996736a17..f33332f0302 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java @@ -598,7 +598,7 @@ public class SimulatedFSDataset implements FsDatasetSpi { @Override public StorageType getStorageType() { - return null; + return StorageType.DISK; } @Override @@ -1178,7 +1178,7 @@ public class SimulatedFSDataset implements FsDatasetSpi { return new ReplicaHandler(binfo, null); } - protected synchronized InputStream getBlockInputStream(ExtendedBlock b) + public synchronized InputStream getBlockInputStream(ExtendedBlock b) throws IOException { BInfo binfo = getBlockMap(b).get(b.getLocalBlock()); if (binfo == null) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java index 38e4287020d..0a589a9ea7a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java @@ -240,10 +240,10 @@ public class TestDiskError { @Test public void testDataTransferWhenBytesPerChecksumIsZero() throws IOException { DataNode dn0 = cluster.getDataNodes().get(0); - // Make a mock blockScanner class and return false whenever isEnabled is + // Make a mock blockScanner class and return true whenever isEnabled is // called on blockScanner BlockScanner mockScanner = Mockito.mock(BlockScanner.class); - Mockito.when(mockScanner.isEnabled()).thenReturn(false); + Mockito.when(mockScanner.isEnabled()).thenReturn(true); dn0.setBlockScanner(mockScanner); Path filePath = new Path("test.dat"); FSDataOutputStream out = fs.create(filePath, (short) 1);