HDFS-7235. DataNode#transferBlock should report blocks that don't exist using reportBadBlock (yzhang via cmccabe)

(cherry picked from commit ac9ab037e9)

(cherry picked from commit 842a54a5f6)
(cherry picked from commit 1aa9e34c5106c496ffd390f6b2c822d387fb1908)
This commit is contained in:
Colin Patrick Mccabe 2014-10-28 16:41:22 -07:00 committed by Vinod Kumar Vavilapalli
parent dda8e0e328
commit d3a7769bff
8 changed files with 270 additions and 40 deletions

View File

@ -35,6 +35,9 @@ Release 2.6.1 - UNRELEASED
HDFS-8486. DN startup may cause severe data loss. (daryn via cmccabe) HDFS-8486. DN startup may cause severe data loss. (daryn via cmccabe)
HDFS-7235. DataNode#transferBlock should report blocks that don't exist
using reportBadBlock (yzhang via cmccabe)
Release 2.6.0 - 2014-11-18 Release 2.6.0 - 2014-11-18
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -56,7 +56,9 @@ import java.io.BufferedOutputStream;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.DataInputStream; import java.io.DataInputStream;
import java.io.DataOutputStream; import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
@ -1774,29 +1776,58 @@ public class DataNode extends ReconfigurableBase
return xmitsInProgress.get(); return xmitsInProgress.get();
} }
private void reportBadBlock(final BPOfferService bpos,
final ExtendedBlock block, final String msg) {
FsVolumeSpi volume = getFSDataset().getVolume(block);
bpos.reportBadBlocks(
block, volume.getStorageID(), volume.getStorageType());
LOG.warn(msg);
}
private void transferBlock(ExtendedBlock block, DatanodeInfo[] xferTargets, private void transferBlock(ExtendedBlock block, DatanodeInfo[] xferTargets,
StorageType[] xferTargetStorageTypes) throws IOException { StorageType[] xferTargetStorageTypes) throws IOException {
BPOfferService bpos = getBPOSForBlock(block); BPOfferService bpos = getBPOSForBlock(block);
DatanodeRegistration bpReg = getDNRegistrationForBP(block.getBlockPoolId()); DatanodeRegistration bpReg = getDNRegistrationForBP(block.getBlockPoolId());
if (!data.isValidBlock(block)) { boolean replicaNotExist = false;
// block does not exist or is under-construction boolean replicaStateNotFinalized = false;
boolean blockFileNotExist = false;
boolean lengthTooShort = false;
try {
data.checkBlock(block, block.getNumBytes(), ReplicaState.FINALIZED);
} catch (ReplicaNotFoundException e) {
replicaNotExist = true;
} catch (UnexpectedReplicaStateException e) {
replicaStateNotFinalized = true;
} catch (FileNotFoundException e) {
blockFileNotExist = true;
} catch (EOFException e) {
lengthTooShort = true;
} catch (IOException e) {
// The IOException indicates not being able to access block file,
// treat it the same here as blockFileNotExist, to trigger
// reporting it as a bad block
blockFileNotExist = true;
}
if (replicaNotExist || replicaStateNotFinalized) {
String errStr = "Can't send invalid block " + block; String errStr = "Can't send invalid block " + block;
LOG.info(errStr); LOG.info(errStr);
bpos.trySendErrorReport(DatanodeProtocol.INVALID_BLOCK, errStr); bpos.trySendErrorReport(DatanodeProtocol.INVALID_BLOCK, errStr);
return; return;
} }
if (blockFileNotExist) {
// Check if NN recorded length matches on-disk length // Report back to NN bad block caused by non-existent block file.
long onDiskLength = data.getLength(block); reportBadBlock(bpos, block, "Can't replicate block " + block
if (block.getNumBytes() > onDiskLength) { + " because the block file doesn't exist, or is not accessible");
FsVolumeSpi volume = getFSDataset().getVolume(block); return;
}
if (lengthTooShort) {
// Check if NN recorded length matches on-disk length
// Shorter on-disk len indicates corruption so report NN the corrupt block // Shorter on-disk len indicates corruption so report NN the corrupt block
bpos.reportBadBlocks( reportBadBlock(bpos, block, "Can't replicate block " + block
block, volume.getStorageID(), volume.getStorageType()); + " because on-disk length " + data.getLength(block)
LOG.warn("Can't replicate block " + block
+ " because on-disk length " + onDiskLength
+ " is shorter than NameNode recorded length " + block.getNumBytes()); + " is shorter than NameNode recorded length " + block.getNumBytes());
return; return;
} }

View File

@ -0,0 +1,45 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode;
import java.io.IOException;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
/**
* Exception indicating that the replica is in an unexpected state
*/
public class UnexpectedReplicaStateException extends IOException {
private static final long serialVersionUID = 1L;
public UnexpectedReplicaStateException() {
super();
}
public UnexpectedReplicaStateException(ExtendedBlock b,
ReplicaState expectedState) {
super("Replica " + b + " is not in expected state " + expectedState);
}
public UnexpectedReplicaStateException(String msg) {
super(msg);
}
}

View File

@ -18,8 +18,10 @@
package org.apache.hadoop.hdfs.server.datanode.fsdataset; package org.apache.hadoop.hdfs.server.datanode.fsdataset;
import java.io.EOFException;
import java.io.File; import java.io.File;
import java.io.FileDescriptor; import java.io.FileDescriptor;
import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.util.Collection; import java.util.Collection;
@ -35,12 +37,15 @@ import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata; import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataStorage; import org.apache.hadoop.hdfs.server.datanode.DataStorage;
import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica; import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
import org.apache.hadoop.hdfs.server.datanode.Replica; import org.apache.hadoop.hdfs.server.datanode.Replica;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipelineInterface; import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipelineInterface;
import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation; import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.hdfs.server.datanode.UnexpectedReplicaStateException;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetFactory; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetFactory;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean; import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
@ -295,6 +300,29 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
/** Does the dataset contain the block? */ /** Does the dataset contain the block? */
public boolean contains(ExtendedBlock block); public boolean contains(ExtendedBlock block);
/**
* Check if a block is valid.
*
* @param b The block to check.
* @param minLength The minimum length that the block must have. May be 0.
* @param state If this is null, it is ignored. If it is non-null, we
* will check that the replica has this state.
*
* @throws ReplicaNotFoundException If the replica is not found
*
* @throws UnexpectedReplicaStateException If the replica is not in the
* expected state.
* @throws FileNotFoundException If the block file is not found or there
* was an error locating it.
* @throws EOFException If the replica length is too short.
*
* @throws IOException May be thrown from the methods called.
*/
public void checkBlock(ExtendedBlock b, long minLength, ReplicaState state)
throws ReplicaNotFoundException, UnexpectedReplicaStateException,
FileNotFoundException, EOFException, IOException;
/** /**
* Is the block valid? * Is the block valid?
* @return - true if the specified block is valid * @return - true if the specified block is valid

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import java.io.BufferedOutputStream; import java.io.BufferedOutputStream;
import java.io.DataOutputStream; import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.File; import java.io.File;
import java.io.FileDescriptor; import java.io.FileDescriptor;
import java.io.FileInputStream; import java.io.FileInputStream;
@ -81,6 +82,7 @@ import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
import org.apache.hadoop.hdfs.server.datanode.ReplicaUnderRecovery; import org.apache.hadoop.hdfs.server.datanode.ReplicaUnderRecovery;
import org.apache.hadoop.hdfs.server.datanode.ReplicaWaitingToBeRecovered; import org.apache.hadoop.hdfs.server.datanode.ReplicaWaitingToBeRecovered;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation; import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.hdfs.server.datanode.UnexpectedReplicaStateException;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream; import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
@ -1451,6 +1453,45 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
return finalized; return finalized;
} }
/**
* Check if a block is valid.
*
* @param b The block to check.
* @param minLength The minimum length that the block must have. May be 0.
* @param state If this is null, it is ignored. If it is non-null, we
* will check that the replica has this state.
*
* @throws ReplicaNotFoundException If the replica is not found
*
* @throws UnexpectedReplicaStateException If the replica is not in the
* expected state.
* @throws FileNotFoundException If the block file is not found or there
* was an error locating it.
* @throws EOFException If the replica length is too short.
*
* @throws IOException May be thrown from the methods called.
*/
public void checkBlock(ExtendedBlock b, long minLength, ReplicaState state)
throws ReplicaNotFoundException, UnexpectedReplicaStateException,
FileNotFoundException, EOFException, IOException {
final ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),
b.getLocalBlock());
if (replicaInfo == null) {
throw new ReplicaNotFoundException(b);
}
if (replicaInfo.getState() != state) {
throw new UnexpectedReplicaStateException(b,state);
}
if (!replicaInfo.getBlockFile().exists()) {
throw new FileNotFoundException(replicaInfo.getBlockFile().getPath());
}
long onDiskLength = getLength(b);
if (onDiskLength < minLength) {
throw new EOFException(b + "'s on-disk length " + onDiskLength
+ " is shorter than minLength " + minLength);
}
}
/** /**
* Check whether the given block is a valid one. * Check whether the given block is a valid one.
* valid means finalized * valid means finalized
@ -1470,11 +1511,12 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
/** Does the block exist and have the given state? */ /** Does the block exist and have the given state? */
private boolean isValid(final ExtendedBlock b, final ReplicaState state) { private boolean isValid(final ExtendedBlock b, final ReplicaState state) {
final ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), try {
b.getLocalBlock()); checkBlock(b, 0, state);
return replicaInfo != null } catch (IOException e) {
&& replicaInfo.getState() == state return false;
&& replicaInfo.getBlockFile().exists(); }
return true;
} }
/** /**

View File

@ -1825,21 +1825,38 @@ public class MiniDFSCluster {
} }
} }
private int corruptBlockOnDataNodesHelper(ExtendedBlock block,
boolean deleteBlockFile) throws IOException {
int blocksCorrupted = 0;
File[] blockFiles = getAllBlockFiles(block);
for (File f : blockFiles) {
if ((deleteBlockFile && corruptBlockByDeletingBlockFile(f)) ||
(!deleteBlockFile && corruptBlock(f))) {
blocksCorrupted++;
}
}
return blocksCorrupted;
}
/** /**
* Return the contents of the given block on the given datanode. * Return the number of corrupted replicas of the given block.
* *
* @param block block to be corrupted * @param block block to be corrupted
* @throws IOException on error accessing the file for the given block * @throws IOException on error accessing the file for the given block
*/ */
public int corruptBlockOnDataNodes(ExtendedBlock block) throws IOException{ public int corruptBlockOnDataNodes(ExtendedBlock block) throws IOException{
int blocksCorrupted = 0; return corruptBlockOnDataNodesHelper(block, false);
File[] blockFiles = getAllBlockFiles(block); }
for (File f : blockFiles) {
if (corruptBlock(f)) { /**
blocksCorrupted++; * Return the number of corrupted replicas of the given block.
} *
} * @param block block to be corrupted
return blocksCorrupted; * @throws IOException on error accessing the file for the given block
*/
public int corruptBlockOnDataNodesByDeletingBlockFile(ExtendedBlock block)
throws IOException{
return corruptBlockOnDataNodesHelper(block, true);
} }
public String readBlockOnDataNode(int i, ExtendedBlock block) public String readBlockOnDataNode(int i, ExtendedBlock block)
@ -1888,6 +1905,17 @@ public class MiniDFSCluster {
return true; return true;
} }
/*
* Corrupt a block on a particular datanode by deleting the block file
*/
public static boolean corruptBlockByDeletingBlockFile(File blockFile)
throws IOException {
if (blockFile == null || !blockFile.exists()) {
return false;
}
return blockFile.delete();
}
public static boolean changeGenStampOfBlock(int dnIndex, ExtendedBlock blk, public static boolean changeGenStampOfBlock(int dnIndex, ExtendedBlock blk,
long newGenStamp) throws IOException { long newGenStamp) throws IOException {
File blockFile = getBlockFile(dnIndex, blk); File blockFile = getBlockFile(dnIndex, blk);

View File

@ -151,11 +151,8 @@ public class TestReplication {
assertTrue(!fileSys.exists(name)); assertTrue(!fileSys.exists(name));
} }
/* private void testBadBlockReportOnTransfer(
* Test if Datanode reports bad blocks during replication request boolean corruptBlockByDeletingBlockFile) throws Exception {
*/
@Test
public void testBadBlockReportOnTransfer() throws Exception {
Configuration conf = new HdfsConfiguration(); Configuration conf = new HdfsConfiguration();
FileSystem fs = null; FileSystem fs = null;
DFSClient dfsClient = null; DFSClient dfsClient = null;
@ -176,7 +173,11 @@ public class TestReplication {
// Corrupt the block belonging to the created file // Corrupt the block belonging to the created file
ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, file1); ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, file1);
int blockFilesCorrupted = cluster.corruptBlockOnDataNodes(block); int blockFilesCorrupted =
corruptBlockByDeletingBlockFile?
cluster.corruptBlockOnDataNodesByDeletingBlockFile(block) :
cluster.corruptBlockOnDataNodes(block);
assertEquals("Corrupted too few blocks", replFactor, blockFilesCorrupted); assertEquals("Corrupted too few blocks", replFactor, blockFilesCorrupted);
// Increase replication factor, this should invoke transfer request // Increase replication factor, this should invoke transfer request
@ -201,6 +202,23 @@ public class TestReplication {
cluster.shutdown(); cluster.shutdown();
} }
/*
* Test if Datanode reports bad blocks during replication request
*/
@Test
public void testBadBlockReportOnTransfer() throws Exception {
testBadBlockReportOnTransfer(false);
}
/*
* Test if Datanode reports bad blocks during replication request
* with missing block file
*/
@Test
public void testBadBlockReportOnTransferMissingBlockFile() throws Exception {
testBadBlockReportOnTransfer(true);
}
/** /**
* Tests replication in DFS. * Tests replication in DFS.
*/ */

View File

@ -724,17 +724,52 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
return getBInfo(block) != null; return getBInfo(block) != null;
} }
/**
* Check if a block is valid.
*
* @param b The block to check.
* @param minLength The minimum length that the block must have. May be 0.
* @param state If this is null, it is ignored. If it is non-null, we
* will check that the replica has this state.
*
* @throws ReplicaNotFoundException If the replica is not found
*
* @throws UnexpectedReplicaStateException If the replica is not in the
* expected state.
*/
@Override // {@link FsDatasetSpi}
public void checkBlock(ExtendedBlock b, long minLength, ReplicaState state)
throws ReplicaNotFoundException, UnexpectedReplicaStateException {
final BInfo binfo = getBInfo(b);
if (binfo == null) {
throw new ReplicaNotFoundException(b);
}
if ((state == ReplicaState.FINALIZED && !binfo.isFinalized()) ||
(state != ReplicaState.FINALIZED && binfo.isFinalized())) {
throw new UnexpectedReplicaStateException(b,state);
}
}
@Override // FsDatasetSpi @Override // FsDatasetSpi
public synchronized boolean isValidBlock(ExtendedBlock b) { public synchronized boolean isValidBlock(ExtendedBlock b) {
final BInfo binfo = getBInfo(b); try {
return binfo != null && binfo.isFinalized(); checkBlock(b, 0, ReplicaState.FINALIZED);
} catch (IOException e) {
return false;
}
return true;
} }
/* check if a block is created but not finalized */ /* check if a block is created but not finalized */
@Override @Override
public synchronized boolean isValidRbw(ExtendedBlock b) { public synchronized boolean isValidRbw(ExtendedBlock b) {
final BInfo binfo = getBInfo(b); try {
return binfo != null && !binfo.isFinalized(); checkBlock(b, 0, ReplicaState.RBW);
} catch (IOException e) {
return false;
}
return true;
} }
@Override @Override