HDFS-7235. DataNode#transferBlock should report blocks that don't exist using reportBadBlock (yzhang via cmccabe)

This commit is contained in:
Colin Patrick Mccabe 2014-10-28 16:41:22 -07:00
parent 8984e9b177
commit ac9ab037e9
8 changed files with 271 additions and 40 deletions

View File

@ -362,6 +362,10 @@ Release 2.7.0 - UNRELEASED
TestBlockReaderFactory failures resulting from TemporarySocketDirectory GC. TestBlockReaderFactory failures resulting from TemporarySocketDirectory GC.
(Jinghui Wang via Colin Patrick McCabe) (Jinghui Wang via Colin Patrick McCabe)
HDFS-7235. DataNode#transferBlock should report blocks that don't exist
using reportBadBlock (yzhang via cmccabe)
Release 2.6.0 - UNRELEASED Release 2.6.0 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -56,7 +56,9 @@ import java.io.BufferedOutputStream;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.DataInputStream; import java.io.DataInputStream;
import java.io.DataOutputStream; import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
@ -1768,30 +1770,59 @@ public class DataNode extends ReconfigurableBase
int getXmitsInProgress() { int getXmitsInProgress() {
return xmitsInProgress.get(); return xmitsInProgress.get();
} }
private void reportBadBlock(final BPOfferService bpos,
final ExtendedBlock block, final String msg) {
FsVolumeSpi volume = getFSDataset().getVolume(block);
bpos.reportBadBlocks(
block, volume.getStorageID(), volume.getStorageType());
LOG.warn(msg);
}
private void transferBlock(ExtendedBlock block, DatanodeInfo[] xferTargets, private void transferBlock(ExtendedBlock block, DatanodeInfo[] xferTargets,
StorageType[] xferTargetStorageTypes) throws IOException { StorageType[] xferTargetStorageTypes) throws IOException {
BPOfferService bpos = getBPOSForBlock(block); BPOfferService bpos = getBPOSForBlock(block);
DatanodeRegistration bpReg = getDNRegistrationForBP(block.getBlockPoolId()); DatanodeRegistration bpReg = getDNRegistrationForBP(block.getBlockPoolId());
if (!data.isValidBlock(block)) { boolean replicaNotExist = false;
// block does not exist or is under-construction boolean replicaStateNotFinalized = false;
boolean blockFileNotExist = false;
boolean lengthTooShort = false;
try {
data.checkBlock(block, block.getNumBytes(), ReplicaState.FINALIZED);
} catch (ReplicaNotFoundException e) {
replicaNotExist = true;
} catch (UnexpectedReplicaStateException e) {
replicaStateNotFinalized = true;
} catch (FileNotFoundException e) {
blockFileNotExist = true;
} catch (EOFException e) {
lengthTooShort = true;
} catch (IOException e) {
// The IOException indicates not being able to access block file,
// treat it the same here as blockFileNotExist, to trigger
// reporting it as a bad block
blockFileNotExist = true;
}
if (replicaNotExist || replicaStateNotFinalized) {
String errStr = "Can't send invalid block " + block; String errStr = "Can't send invalid block " + block;
LOG.info(errStr); LOG.info(errStr);
bpos.trySendErrorReport(DatanodeProtocol.INVALID_BLOCK, errStr); bpos.trySendErrorReport(DatanodeProtocol.INVALID_BLOCK, errStr);
return; return;
} }
if (blockFileNotExist) {
// Check if NN recorded length matches on-disk length // Report back to NN bad block caused by non-existent block file.
long onDiskLength = data.getLength(block); reportBadBlock(bpos, block, "Can't replicate block " + block
if (block.getNumBytes() > onDiskLength) { + " because the block file doesn't exist, or is not accessible");
FsVolumeSpi volume = getFSDataset().getVolume(block); return;
}
if (lengthTooShort) {
// Check if NN recorded length matches on-disk length
// Shorter on-disk len indicates corruption so report NN the corrupt block // Shorter on-disk len indicates corruption so report NN the corrupt block
bpos.reportBadBlocks( reportBadBlock(bpos, block, "Can't replicate block " + block
block, volume.getStorageID(), volume.getStorageType()); + " because on-disk length " + data.getLength(block)
LOG.warn("Can't replicate block " + block
+ " because on-disk length " + onDiskLength
+ " is shorter than NameNode recorded length " + block.getNumBytes()); + " is shorter than NameNode recorded length " + block.getNumBytes());
return; return;
} }

View File

@ -0,0 +1,45 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode;
import java.io.IOException;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
/**
* Exception indicating that the replica is in an unexpected state
*/
public class UnexpectedReplicaStateException extends IOException {
private static final long serialVersionUID = 1L;
public UnexpectedReplicaStateException() {
super();
}
public UnexpectedReplicaStateException(ExtendedBlock b,
ReplicaState expectedState) {
super("Replica " + b + " is not in expected state " + expectedState);
}
public UnexpectedReplicaStateException(String msg) {
super(msg);
}
}

View File

@ -18,8 +18,10 @@
package org.apache.hadoop.hdfs.server.datanode.fsdataset; package org.apache.hadoop.hdfs.server.datanode.fsdataset;
import java.io.EOFException;
import java.io.File; import java.io.File;
import java.io.FileDescriptor; import java.io.FileDescriptor;
import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.util.Collection; import java.util.Collection;
@ -35,12 +37,15 @@ import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata; import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataStorage; import org.apache.hadoop.hdfs.server.datanode.DataStorage;
import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica; import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
import org.apache.hadoop.hdfs.server.datanode.Replica; import org.apache.hadoop.hdfs.server.datanode.Replica;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipelineInterface; import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipelineInterface;
import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation; import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.hdfs.server.datanode.UnexpectedReplicaStateException;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetFactory; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetFactory;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean; import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
@ -298,6 +303,29 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
/** Does the dataset contain the block? */ /** Does the dataset contain the block? */
public boolean contains(ExtendedBlock block); public boolean contains(ExtendedBlock block);
/**
* Check if a block is valid.
*
* @param b The block to check.
* @param minLength The minimum length that the block must have. May be 0.
* @param state If this is null, it is ignored. If it is non-null, we
* will check that the replica has this state.
*
* @throws ReplicaNotFoundException If the replica is not found
*
* @throws UnexpectedReplicaStateException If the replica is not in the
* expected state.
* @throws FileNotFoundException If the block file is not found or there
* was an error locating it.
* @throws EOFException If the replica length is too short.
*
* @throws IOException May be thrown from the methods called.
*/
public void checkBlock(ExtendedBlock b, long minLength, ReplicaState state)
throws ReplicaNotFoundException, UnexpectedReplicaStateException,
FileNotFoundException, EOFException, IOException;
/** /**
* Is the block valid? * Is the block valid?
* @return - true if the specified block is valid * @return - true if the specified block is valid

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import java.io.BufferedOutputStream; import java.io.BufferedOutputStream;
import java.io.DataOutputStream; import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.File; import java.io.File;
import java.io.FileDescriptor; import java.io.FileDescriptor;
import java.io.FileInputStream; import java.io.FileInputStream;
@ -80,6 +81,7 @@ import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
import org.apache.hadoop.hdfs.server.datanode.ReplicaUnderRecovery; import org.apache.hadoop.hdfs.server.datanode.ReplicaUnderRecovery;
import org.apache.hadoop.hdfs.server.datanode.ReplicaWaitingToBeRecovered; import org.apache.hadoop.hdfs.server.datanode.ReplicaWaitingToBeRecovered;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation; import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.hdfs.server.datanode.UnexpectedReplicaStateException;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream; import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
@ -1450,6 +1452,45 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
return finalized; return finalized;
} }
/**
* Check if a block is valid.
*
* @param b The block to check.
* @param minLength The minimum length that the block must have. May be 0.
* @param state If this is null, it is ignored. If it is non-null, we
* will check that the replica has this state.
*
* @throws ReplicaNotFoundException If the replica is not found
*
* @throws UnexpectedReplicaStateException If the replica is not in the
* expected state.
* @throws FileNotFoundException If the block file is not found or there
* was an error locating it.
* @throws EOFException If the replica length is too short.
*
* @throws IOException May be thrown from the methods called.
*/
public void checkBlock(ExtendedBlock b, long minLength, ReplicaState state)
throws ReplicaNotFoundException, UnexpectedReplicaStateException,
FileNotFoundException, EOFException, IOException {
final ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),
b.getLocalBlock());
if (replicaInfo == null) {
throw new ReplicaNotFoundException(b);
}
if (replicaInfo.getState() != state) {
throw new UnexpectedReplicaStateException(b,state);
}
if (!replicaInfo.getBlockFile().exists()) {
throw new FileNotFoundException(replicaInfo.getBlockFile().getPath());
}
long onDiskLength = getLength(b);
if (onDiskLength < minLength) {
throw new EOFException(b + "'s on-disk length " + onDiskLength
+ " is shorter than minLength " + minLength);
}
}
/** /**
* Check whether the given block is a valid one. * Check whether the given block is a valid one.
* valid means finalized * valid means finalized
@ -1458,7 +1499,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
public boolean isValidBlock(ExtendedBlock b) { public boolean isValidBlock(ExtendedBlock b) {
return isValid(b, ReplicaState.FINALIZED); return isValid(b, ReplicaState.FINALIZED);
} }
/** /**
* Check whether the given block is a valid RBW. * Check whether the given block is a valid RBW.
*/ */
@ -1469,11 +1510,12 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
/** Does the block exist and have the given state? */ /** Does the block exist and have the given state? */
private boolean isValid(final ExtendedBlock b, final ReplicaState state) { private boolean isValid(final ExtendedBlock b, final ReplicaState state) {
final ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), try {
b.getLocalBlock()); checkBlock(b, 0, state);
return replicaInfo != null } catch (IOException e) {
&& replicaInfo.getState() == state return false;
&& replicaInfo.getBlockFile().exists(); }
return true;
} }
/** /**

View File

@ -1822,23 +1822,40 @@ public class MiniDFSCluster {
} }
} }
/** private int corruptBlockOnDataNodesHelper(ExtendedBlock block,
* Return the contents of the given block on the given datanode. boolean deleteBlockFile) throws IOException {
*
* @param block block to be corrupted
* @throws IOException on error accessing the file for the given block
*/
public int corruptBlockOnDataNodes(ExtendedBlock block) throws IOException{
int blocksCorrupted = 0; int blocksCorrupted = 0;
File[] blockFiles = getAllBlockFiles(block); File[] blockFiles = getAllBlockFiles(block);
for (File f : blockFiles) { for (File f : blockFiles) {
if (corruptBlock(f)) { if ((deleteBlockFile && corruptBlockByDeletingBlockFile(f)) ||
(!deleteBlockFile && corruptBlock(f))) {
blocksCorrupted++; blocksCorrupted++;
} }
} }
return blocksCorrupted; return blocksCorrupted;
} }
/**
* Return the number of corrupted replicas of the given block.
*
* @param block block to be corrupted
* @throws IOException on error accessing the file for the given block
*/
public int corruptBlockOnDataNodes(ExtendedBlock block) throws IOException{
return corruptBlockOnDataNodesHelper(block, false);
}
/**
* Return the number of corrupted replicas of the given block.
*
* @param block block to be corrupted
* @throws IOException on error accessing the file for the given block
*/
public int corruptBlockOnDataNodesByDeletingBlockFile(ExtendedBlock block)
throws IOException{
return corruptBlockOnDataNodesHelper(block, true);
}
public String readBlockOnDataNode(int i, ExtendedBlock block) public String readBlockOnDataNode(int i, ExtendedBlock block)
throws IOException { throws IOException {
assert (i >= 0 && i < dataNodes.size()) : "Invalid datanode "+i; assert (i >= 0 && i < dataNodes.size()) : "Invalid datanode "+i;
@ -1884,7 +1901,18 @@ public class MiniDFSCluster {
LOG.warn("Corrupting the block " + blockFile); LOG.warn("Corrupting the block " + blockFile);
return true; return true;
} }
/*
* Corrupt a block on a particular datanode by deleting the block file
*/
public static boolean corruptBlockByDeletingBlockFile(File blockFile)
throws IOException {
if (blockFile == null || !blockFile.exists()) {
return false;
}
return blockFile.delete();
}
public static boolean changeGenStampOfBlock(int dnIndex, ExtendedBlock blk, public static boolean changeGenStampOfBlock(int dnIndex, ExtendedBlock blk,
long newGenStamp) throws IOException { long newGenStamp) throws IOException {
File blockFile = getBlockFile(dnIndex, blk); File blockFile = getBlockFile(dnIndex, blk);

View File

@ -136,11 +136,8 @@ public class TestReplication {
assertTrue(!fileSys.exists(name)); assertTrue(!fileSys.exists(name));
} }
/* private void testBadBlockReportOnTransfer(
* Test if Datanode reports bad blocks during replication request boolean corruptBlockByDeletingBlockFile) throws Exception {
*/
@Test
public void testBadBlockReportOnTransfer() throws Exception {
Configuration conf = new HdfsConfiguration(); Configuration conf = new HdfsConfiguration();
FileSystem fs = null; FileSystem fs = null;
DFSClient dfsClient = null; DFSClient dfsClient = null;
@ -161,7 +158,11 @@ public class TestReplication {
// Corrupt the block belonging to the created file // Corrupt the block belonging to the created file
ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, file1); ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, file1);
int blockFilesCorrupted = cluster.corruptBlockOnDataNodes(block); int blockFilesCorrupted =
corruptBlockByDeletingBlockFile?
cluster.corruptBlockOnDataNodesByDeletingBlockFile(block) :
cluster.corruptBlockOnDataNodes(block);
assertEquals("Corrupted too few blocks", replFactor, blockFilesCorrupted); assertEquals("Corrupted too few blocks", replFactor, blockFilesCorrupted);
// Increase replication factor, this should invoke transfer request // Increase replication factor, this should invoke transfer request
@ -185,7 +186,24 @@ public class TestReplication {
assertTrue(replicaCount == 1); assertTrue(replicaCount == 1);
cluster.shutdown(); cluster.shutdown();
} }
/*
* Test if Datanode reports bad blocks during replication request
*/
@Test
public void testBadBlockReportOnTransfer() throws Exception {
testBadBlockReportOnTransfer(false);
}
/*
* Test if Datanode reports bad blocks during replication request
* with missing block file
*/
@Test
public void testBadBlockReportOnTransferMissingBlockFile() throws Exception {
testBadBlockReportOnTransfer(true);
}
/** /**
* Tests replication in DFS. * Tests replication in DFS.
*/ */

View File

@ -724,17 +724,52 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
return getBInfo(block) != null; return getBInfo(block) != null;
} }
/**
* Check if a block is valid.
*
* @param b The block to check.
* @param minLength The minimum length that the block must have. May be 0.
* @param state If this is null, it is ignored. If it is non-null, we
* will check that the replica has this state.
*
* @throws ReplicaNotFoundException If the replica is not found
*
* @throws UnexpectedReplicaStateException If the replica is not in the
* expected state.
*/
@Override // {@link FsDatasetSpi}
public void checkBlock(ExtendedBlock b, long minLength, ReplicaState state)
throws ReplicaNotFoundException, UnexpectedReplicaStateException {
final BInfo binfo = getBInfo(b);
if (binfo == null) {
throw new ReplicaNotFoundException(b);
}
if ((state == ReplicaState.FINALIZED && !binfo.isFinalized()) ||
(state != ReplicaState.FINALIZED && binfo.isFinalized())) {
throw new UnexpectedReplicaStateException(b,state);
}
}
@Override // FsDatasetSpi @Override // FsDatasetSpi
public synchronized boolean isValidBlock(ExtendedBlock b) { public synchronized boolean isValidBlock(ExtendedBlock b) {
final BInfo binfo = getBInfo(b); try {
return binfo != null && binfo.isFinalized(); checkBlock(b, 0, ReplicaState.FINALIZED);
} catch (IOException e) {
return false;
}
return true;
} }
/* check if a block is created but not finalized */ /* check if a block is created but not finalized */
@Override @Override
public synchronized boolean isValidRbw(ExtendedBlock b) { public synchronized boolean isValidRbw(ExtendedBlock b) {
final BInfo binfo = getBInfo(b); try {
return binfo != null && !binfo.isFinalized(); checkBlock(b, 0, ReplicaState.RBW);
} catch (IOException e) {
return false;
}
return true;
} }
@Override @Override