HDFS-13709. Report bad block to NN when transfer block encounter EIO exception. Contributed by Chen Zhang.

This commit is contained in:
Wei-Chiu Chuang 2019-08-19 13:08:55 -07:00
parent abae6ff2a2
commit 360a96f342
7 changed files with 208 additions and 18 deletions

View File

@ -183,6 +183,8 @@ class BlockSender implements java.io.Closeable {
// would risk sending too much unnecessary data. 512 (1 disk sector)
// is likely to result in minimal extra IO.
private static final long CHUNK_SIZE = 512;
private static final String EIO_ERROR = "Input/output error";
/**
* Constructor
*
@ -576,7 +578,14 @@ private int sendPacket(ByteBuffer pkt, int maxChunks, OutputStream out,
int dataOff = checksumOff + checksumDataLen;
if (!transferTo) { // normal transfer
try {
ris.readDataFully(buf, dataOff, dataLen);
} catch (IOException ioe) {
if (ioe.getMessage().startsWith(EIO_ERROR)) {
throw new DiskFileCorruptException("A disk IO error occurred", ioe);
}
throw ioe;
}
if (verifyChecksum) {
verifyChecksum(buf, dataOff, dataLen, numChunks, checksumOff);
@ -623,6 +632,13 @@ private int sendPacket(ByteBuffer pkt, int maxChunks, OutputStream out,
* It was done here because the NIO throws an IOException for EPIPE.
*/
String ioem = e.getMessage();
/*
* If we got an EIO when reading files or transferTo the client socket,
* it's very likely caused by bad disk track or other file corruptions.
*/
if (ioem.startsWith(EIO_ERROR)) {
throw new DiskFileCorruptException("A disk IO error occurred", e);
}
if (!ioem.startsWith("Broken pipe") && !ioem.startsWith("Connection reset")) {
LOG.error("BlockSender.sendChunks() exception: ", e);
datanode.getBlockScanner().markSuspectBlock(

View File

@ -2610,13 +2610,7 @@ public void run() {
metrics.incrBlocksReplicated();
}
} catch (IOException ie) {
if (ie instanceof InvalidChecksumSizeException) {
// Add the block to the front of the scanning queue if metadata file
// is corrupt. We already add the block to front of scanner if the
// peer disconnects.
LOG.info("Adding block: {} for scanning", b);
blockScanner.markSuspectBlock(data.getVolume(b).getStorageID(), b);
}
handleBadBlock(b, ie, false);
LOG.warn("{}:Failed to transfer {} to {} got",
bpReg, b, targets[0], ie);
} finally {
@ -3462,6 +3456,41 @@ private void handleVolumeFailures(Set<FsVolumeSpi> unhealthyVolumes) {
handleDiskError(sb.toString());
}
/**
* A bad block need to be handled, either to add to blockScanner suspect queue
* or report to NameNode directly.
*
* If the method is called by scanner, then the block must be a bad block, we
* report it to NameNode directly. Otherwise if we judge it as a bad block
* according to exception type, then we try to add the bad block to
* blockScanner suspect queue if blockScanner is enabled, or report to
* NameNode directly otherwise.
*
* @param block The suspicious block
* @param e The exception encountered when accessing the block
* @param fromScanner Is it from blockScanner. The blockScanner will call this
* method only when it's sure that the block is corrupt.
*/
void handleBadBlock(ExtendedBlock block, IOException e, boolean fromScanner) {
boolean isBadBlock = fromScanner || (e instanceof DiskFileCorruptException
|| e instanceof InvalidChecksumSizeException);
if (!isBadBlock) {
return;
}
if (!fromScanner && blockScanner.isEnabled()) {
blockScanner.markSuspectBlock(data.getVolume(block).getStorageID(),
block);
} else {
try {
reportBadBlocks(block);
} catch (IOException ie) {
LOG.warn("report bad block {} failed", block, ie);
}
}
}
@VisibleForTesting
public long getLastDiskErrorCheck() {
return lastDiskErrorCheck;

View File

@ -0,0 +1,39 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode;
import java.io.IOException;
/**
* When kernel report a "Input/output error", we use this exception to
* represents some corruption(e.g. bad disk track) happened on some disk file.
*/
public class DiskFileCorruptException extends IOException {
/**
* Instantiate.
* @param msg the exception message
* @param cause the underlying cause
*/
public DiskFileCorruptException(String msg, Throwable cause) {
super(msg, cause);
}
public DiskFileCorruptException(String msg) {
super(msg);
}
}

View File

@ -290,12 +290,7 @@ public void handle(ExtendedBlock block, IOException e) {
return;
}
LOG.warn("Reporting bad {} on {}", block, volume);
try {
scanner.datanode.reportBadBlocks(block, volume);
} catch (IOException ie) {
// This is bad, but not bad enough to shut down the scanner.
LOG.warn("Cannot report bad block " + block, ie);
}
scanner.datanode.handleBadBlock(block, e, true);
}
}

View File

@ -27,6 +27,7 @@
import com.google.common.base.Supplier;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.util.ArrayList;
@ -53,7 +54,9 @@
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataStorage;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.FsDatasetTestUtils.MaterializedReplica;
import org.apache.hadoop.hdfs.server.datanode.InternalDataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
@ -154,6 +157,67 @@ private void cleanupFile(FileSystem fileSys, Path name) throws IOException {
assertTrue(!fileSys.exists(name));
}
private static class CorruptFileSimulatedFSDataset
extends SimulatedFSDataset {
/**
* Simulated input and output streams.
*
*/
static private class CorruptFileSimulatedInputStream
extends java.io.InputStream {
private InputStream inputStream;
CorruptFileSimulatedInputStream(InputStream is) {
inputStream = is;
}
@Override
public int read() throws IOException {
int ret = inputStream.read();
if (ret > 0) {
throw new IOException("Input/output error");
}
return ret;
}
@Override
public int read(byte[] b) throws IOException {
int ret = inputStream.read(b);
if (ret > 0) {
throw new IOException("Input/output error");
}
return ret;
}
}
CorruptFileSimulatedFSDataset(DataNode datanode, DataStorage storage,
Configuration conf) {
super(storage, conf);
}
@Override
public synchronized InputStream getBlockInputStream(ExtendedBlock b,
long seekOffset) throws IOException {
InputStream result = super.getBlockInputStream(b);
IOUtils.skipFully(result, seekOffset);
return new CorruptFileSimulatedInputStream(result);
}
static class Factory
extends FsDatasetSpi.Factory<CorruptFileSimulatedFSDataset> {
@Override
public CorruptFileSimulatedFSDataset newInstance(DataNode datanode,
DataStorage storage, Configuration conf) throws IOException {
return new CorruptFileSimulatedFSDataset(datanode, storage, conf);
}
@Override
public boolean isSimulated() {
return true;
}
}
}
private void testBadBlockReportOnTransfer(
boolean corruptBlockByDeletingBlockFile) throws Exception {
Configuration conf = new HdfsConfiguration();
@ -205,6 +269,53 @@ private void testBadBlockReportOnTransfer(
cluster.shutdown();
}
@Test(timeout = 30000)
public void testBadBlockReportOnTransferCorruptFile() throws Exception {
Configuration conf = new HdfsConfiguration();
conf.set(DFSConfigKeys.DFS_DATANODE_FSDATASET_FACTORY_KEY,
CorruptFileSimulatedFSDataset.Factory.class.getName());
// Disable BlockScanner to trigger reportBadBlocks
conf.setLong(DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, -1L);
FileSystem fs;
int replicaCount = 0;
short replFactor = 1;
MiniDFSCluster cluster =
new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
cluster.waitActive();
try {
fs = cluster.getFileSystem();
final DFSClient dfsClient = new DFSClient(
new InetSocketAddress("localhost", cluster.getNameNodePort()), conf);
// Create file with replication factor of 1
Path file1 = new Path("/tmp/testBadBlockReportOnTransfer/file1");
DFSTestUtil.createFile(fs, file1, 1024, replFactor, 0);
DFSTestUtil.waitReplication(fs, file1, replFactor);
// Increase replication factor, this should invoke transfer request
// Receiving datanode fails on checksum and reports it to namenode
replFactor = 2;
fs.setReplication(file1, replFactor);
// Now get block details and check if the block is corrupt
GenericTestUtils.waitFor(() -> {
try {
return dfsClient.getNamenode()
.getBlockLocations(file1.toString(), 0, Long.MAX_VALUE).get(0)
.isCorrupt();
} catch (IOException ie) {
return false;
}
}, 1000, 15000);
replicaCount = dfsClient.getNamenode()
.getBlockLocations(file1.toString(), 0, Long.MAX_VALUE).get(0)
.getLocations().length;
assertEquals("replication should not success", 1, replicaCount);
} finally {
cluster.shutdown();
}
}
/*
* Test if Datanode reports bad blocks during replication request
*/

View File

@ -598,7 +598,7 @@ public long getAvailable() throws IOException {
@Override
public StorageType getStorageType() {
return null;
return StorageType.DISK;
}
@Override
@ -1178,7 +1178,7 @@ public synchronized ReplicaHandler createTemporary(StorageType storageType,
return new ReplicaHandler(binfo, null);
}
protected synchronized InputStream getBlockInputStream(ExtendedBlock b)
public synchronized InputStream getBlockInputStream(ExtendedBlock b)
throws IOException {
BInfo binfo = getBlockMap(b).get(b.getLocalBlock());
if (binfo == null) {

View File

@ -240,10 +240,10 @@ public Boolean get() {
@Test
public void testDataTransferWhenBytesPerChecksumIsZero() throws IOException {
DataNode dn0 = cluster.getDataNodes().get(0);
// Make a mock blockScanner class and return false whenever isEnabled is
// Make a mock blockScanner class and return true whenever isEnabled is
// called on blockScanner
BlockScanner mockScanner = Mockito.mock(BlockScanner.class);
Mockito.when(mockScanner.isEnabled()).thenReturn(false);
Mockito.when(mockScanner.isEnabled()).thenReturn(true);
dn0.setBlockScanner(mockScanner);
Path filePath = new Path("test.dat");
FSDataOutputStream out = fs.create(filePath, (short) 1);