HDFS-13709. Report bad block to NN when transfer block encounter EIO exception. Contributed by Chen Zhang.

(cherry picked from commit 360a96f342)
This commit is contained in:
Wei-Chiu Chuang 2019-08-19 13:08:55 -07:00
parent abda503040
commit 6d93886231
7 changed files with 208 additions and 18 deletions

View File

@ -182,6 +182,8 @@ class BlockSender implements java.io.Closeable {
// would risk sending too much unnecessary data. 512 (1 disk sector) // would risk sending too much unnecessary data. 512 (1 disk sector)
// is likely to result in minimal extra IO. // is likely to result in minimal extra IO.
private static final long CHUNK_SIZE = 512; private static final long CHUNK_SIZE = 512;
private static final String EIO_ERROR = "Input/output error";
/** /**
* Constructor * Constructor
* *
@ -599,7 +601,14 @@ class BlockSender implements java.io.Closeable {
int dataOff = checksumOff + checksumDataLen; int dataOff = checksumOff + checksumDataLen;
if (!transferTo) { // normal transfer if (!transferTo) { // normal transfer
try {
ris.readDataFully(buf, dataOff, dataLen); ris.readDataFully(buf, dataOff, dataLen);
} catch (IOException ioe) {
if (ioe.getMessage().startsWith(EIO_ERROR)) {
throw new DiskFileCorruptException("A disk IO error occurred", ioe);
}
throw ioe;
}
if (verifyChecksum) { if (verifyChecksum) {
verifyChecksum(buf, dataOff, dataLen, numChunks, checksumOff); verifyChecksum(buf, dataOff, dataLen, numChunks, checksumOff);
@ -646,6 +655,13 @@ class BlockSender implements java.io.Closeable {
* It was done here because the NIO throws an IOException for EPIPE. * It was done here because the NIO throws an IOException for EPIPE.
*/ */
String ioem = e.getMessage(); String ioem = e.getMessage();
/*
* If we got an EIO when reading files or transferTo the client socket,
* it's very likely caused by bad disk track or other file corruptions.
*/
if (ioem.startsWith(EIO_ERROR)) {
throw new DiskFileCorruptException("A disk IO error occurred", e);
}
if (!ioem.startsWith("Broken pipe") && !ioem.startsWith("Connection reset")) { if (!ioem.startsWith("Broken pipe") && !ioem.startsWith("Connection reset")) {
LOG.error("BlockSender.sendChunks() exception: ", e); LOG.error("BlockSender.sendChunks() exception: ", e);
datanode.getBlockScanner().markSuspectBlock( datanode.getBlockScanner().markSuspectBlock(

View File

@ -2592,13 +2592,7 @@ public class DataNode extends ReconfigurableBase
metrics.incrBlocksReplicated(); metrics.incrBlocksReplicated();
} }
} catch (IOException ie) { } catch (IOException ie) {
if (ie instanceof InvalidChecksumSizeException) { handleBadBlock(b, ie, false);
// Add the block to the front of the scanning queue if metadata file
// is corrupt. We already add the block to front of scanner if the
// peer disconnects.
LOG.info("Adding block: {} for scanning", b);
blockScanner.markSuspectBlock(data.getVolume(b).getStorageID(), b);
}
LOG.warn("{}:Failed to transfer {} to {} got", LOG.warn("{}:Failed to transfer {} to {} got",
bpReg, b, targets[0], ie); bpReg, b, targets[0], ie);
} finally { } finally {
@ -3426,6 +3420,41 @@ public class DataNode extends ReconfigurableBase
handleDiskError(sb.toString()); handleDiskError(sb.toString());
} }
/**
* A bad block need to be handled, either to add to blockScanner suspect queue
* or report to NameNode directly.
*
* If the method is called by scanner, then the block must be a bad block, we
* report it to NameNode directly. Otherwise if we judge it as a bad block
* according to exception type, then we try to add the bad block to
* blockScanner suspect queue if blockScanner is enabled, or report to
* NameNode directly otherwise.
*
* @param block The suspicious block
* @param e The exception encountered when accessing the block
* @param fromScanner Is it from blockScanner. The blockScanner will call this
* method only when it's sure that the block is corrupt.
*/
void handleBadBlock(ExtendedBlock block, IOException e, boolean fromScanner) {
boolean isBadBlock = fromScanner || (e instanceof DiskFileCorruptException
|| e instanceof InvalidChecksumSizeException);
if (!isBadBlock) {
return;
}
if (!fromScanner && blockScanner.isEnabled()) {
blockScanner.markSuspectBlock(data.getVolume(block).getStorageID(),
block);
} else {
try {
reportBadBlocks(block);
} catch (IOException ie) {
LOG.warn("report bad block {} failed", block, ie);
}
}
}
@VisibleForTesting @VisibleForTesting
public long getLastDiskErrorCheck() { public long getLastDiskErrorCheck() {
return lastDiskErrorCheck; return lastDiskErrorCheck;

View File

@ -0,0 +1,39 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode;
import java.io.IOException;
/**
* When kernel report a "Input/output error", we use this exception to
* represents some corruption(e.g. bad disk track) happened on some disk file.
*/
public class DiskFileCorruptException extends IOException {
/**
* Instantiate.
* @param msg the exception message
* @param cause the underlying cause
*/
public DiskFileCorruptException(String msg, Throwable cause) {
super(msg, cause);
}
public DiskFileCorruptException(String msg) {
super(msg);
}
}

View File

@ -290,12 +290,7 @@ public class VolumeScanner extends Thread {
return; return;
} }
LOG.warn("Reporting bad {} on {}", block, volume); LOG.warn("Reporting bad {} on {}", block, volume);
try { scanner.datanode.handleBadBlock(block, e, true);
scanner.datanode.reportBadBlocks(block, volume);
} catch (IOException ie) {
// This is bad, but not bad enough to shut down the scanner.
LOG.warn("Cannot report bad block " + block, ie);
}
} }
} }

View File

@ -26,6 +26,7 @@ import static org.junit.Assert.fail;
import com.google.common.base.Supplier; import com.google.common.base.Supplier;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.ArrayList; import java.util.ArrayList;
@ -52,7 +53,9 @@ import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataStorage;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.FsDatasetTestUtils.MaterializedReplica; import org.apache.hadoop.hdfs.server.datanode.FsDatasetTestUtils.MaterializedReplica;
import org.apache.hadoop.hdfs.server.datanode.InternalDataNodeTestUtils; import org.apache.hadoop.hdfs.server.datanode.InternalDataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
@ -155,6 +158,67 @@ public class TestReplication {
assertTrue(!fileSys.exists(name)); assertTrue(!fileSys.exists(name));
} }
private static class CorruptFileSimulatedFSDataset
extends SimulatedFSDataset {
/**
* Simulated input and output streams.
*
*/
static private class CorruptFileSimulatedInputStream
extends java.io.InputStream {
private InputStream inputStream;
CorruptFileSimulatedInputStream(InputStream is) {
inputStream = is;
}
@Override
public int read() throws IOException {
int ret = inputStream.read();
if (ret > 0) {
throw new IOException("Input/output error");
}
return ret;
}
@Override
public int read(byte[] b) throws IOException {
int ret = inputStream.read(b);
if (ret > 0) {
throw new IOException("Input/output error");
}
return ret;
}
}
CorruptFileSimulatedFSDataset(DataNode datanode, DataStorage storage,
Configuration conf) {
super(storage, conf);
}
@Override
public synchronized InputStream getBlockInputStream(ExtendedBlock b,
long seekOffset) throws IOException {
InputStream result = super.getBlockInputStream(b);
IOUtils.skipFully(result, seekOffset);
return new CorruptFileSimulatedInputStream(result);
}
static class Factory
extends FsDatasetSpi.Factory<CorruptFileSimulatedFSDataset> {
@Override
public CorruptFileSimulatedFSDataset newInstance(DataNode datanode,
DataStorage storage, Configuration conf) throws IOException {
return new CorruptFileSimulatedFSDataset(datanode, storage, conf);
}
@Override
public boolean isSimulated() {
return true;
}
}
}
private void testBadBlockReportOnTransfer( private void testBadBlockReportOnTransfer(
boolean corruptBlockByDeletingBlockFile) throws Exception { boolean corruptBlockByDeletingBlockFile) throws Exception {
Configuration conf = new HdfsConfiguration(); Configuration conf = new HdfsConfiguration();
@ -206,6 +270,53 @@ public class TestReplication {
cluster.shutdown(); cluster.shutdown();
} }
@Test(timeout = 30000)
public void testBadBlockReportOnTransferCorruptFile() throws Exception {
Configuration conf = new HdfsConfiguration();
conf.set(DFSConfigKeys.DFS_DATANODE_FSDATASET_FACTORY_KEY,
CorruptFileSimulatedFSDataset.Factory.class.getName());
// Disable BlockScanner to trigger reportBadBlocks
conf.setLong(DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, -1L);
FileSystem fs;
int replicaCount = 0;
short replFactor = 1;
MiniDFSCluster cluster =
new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
cluster.waitActive();
try {
fs = cluster.getFileSystem();
final DFSClient dfsClient = new DFSClient(
new InetSocketAddress("localhost", cluster.getNameNodePort()), conf);
// Create file with replication factor of 1
Path file1 = new Path("/tmp/testBadBlockReportOnTransfer/file1");
DFSTestUtil.createFile(fs, file1, 1024, replFactor, 0);
DFSTestUtil.waitReplication(fs, file1, replFactor);
// Increase replication factor, this should invoke transfer request
// Receiving datanode fails on checksum and reports it to namenode
replFactor = 2;
fs.setReplication(file1, replFactor);
// Now get block details and check if the block is corrupt
GenericTestUtils.waitFor(() -> {
try {
return dfsClient.getNamenode()
.getBlockLocations(file1.toString(), 0, Long.MAX_VALUE).get(0)
.isCorrupt();
} catch (IOException ie) {
return false;
}
}, 1000, 15000);
replicaCount = dfsClient.getNamenode()
.getBlockLocations(file1.toString(), 0, Long.MAX_VALUE).get(0)
.getLocations().length;
assertEquals("replication should not success", 1, replicaCount);
} finally {
cluster.shutdown();
}
}
/* /*
* Test if Datanode reports bad blocks during replication request * Test if Datanode reports bad blocks during replication request
*/ */

View File

@ -577,7 +577,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
@Override @Override
public StorageType getStorageType() { public StorageType getStorageType() {
return null; return StorageType.DISK;
} }
@Override @Override
@ -1161,7 +1161,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
return new ReplicaHandler(binfo, null); return new ReplicaHandler(binfo, null);
} }
protected synchronized InputStream getBlockInputStream(ExtendedBlock b) public synchronized InputStream getBlockInputStream(ExtendedBlock b)
throws IOException { throws IOException {
BInfo binfo = getBlockMap(b).get(b.getLocalBlock()); BInfo binfo = getBlockMap(b).get(b.getLocalBlock());
if (binfo == null) { if (binfo == null) {

View File

@ -240,10 +240,10 @@ public class TestDiskError {
@Test @Test
public void testDataTransferWhenBytesPerChecksumIsZero() throws IOException { public void testDataTransferWhenBytesPerChecksumIsZero() throws IOException {
DataNode dn0 = cluster.getDataNodes().get(0); DataNode dn0 = cluster.getDataNodes().get(0);
// Make a mock blockScanner class and return false whenever isEnabled is // Make a mock blockScanner class and return true whenever isEnabled is
// called on blockScanner // called on blockScanner
BlockScanner mockScanner = Mockito.mock(BlockScanner.class); BlockScanner mockScanner = Mockito.mock(BlockScanner.class);
Mockito.when(mockScanner.isEnabled()).thenReturn(false); Mockito.when(mockScanner.isEnabled()).thenReturn(true);
dn0.setBlockScanner(mockScanner); dn0.setBlockScanner(mockScanner);
Path filePath = new Path("test.dat"); Path filePath = new Path("test.dat");
FSDataOutputStream out = fs.create(filePath, (short) 1); FSDataOutputStream out = fs.create(filePath, (short) 1);