HDFS-7235. DataNode#transferBlock should report blocks that don't exist using reportBadBlock (yzhang via cmccabe)

(cherry picked from commit ac9ab037e9)
This commit is contained in:
Colin Patrick Mccabe 2014-10-28 16:41:22 -07:00
parent d51ea6a248
commit 842a54a5f6
8 changed files with 271 additions and 40 deletions

View File

@ -109,6 +109,10 @@ Release 2.7.0 - UNRELEASED
HDFS-7301. TestMissingBlocksAlert should use MXBeans instead of old web UI.
(Zhe Zhang via wheat9)
HDFS-7235. DataNode#transferBlock should report blocks that don't exist
using reportBadBlock (yzhang via cmccabe)
Release 2.6.0 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -56,7 +56,9 @@
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@ -1776,29 +1778,58 @@ int getXmitsInProgress() {
return xmitsInProgress.get();
}
private void reportBadBlock(final BPOfferService bpos,
final ExtendedBlock block, final String msg) {
FsVolumeSpi volume = getFSDataset().getVolume(block);
bpos.reportBadBlocks(
block, volume.getStorageID(), volume.getStorageType());
LOG.warn(msg);
}
private void transferBlock(ExtendedBlock block, DatanodeInfo[] xferTargets,
StorageType[] xferTargetStorageTypes) throws IOException {
BPOfferService bpos = getBPOSForBlock(block);
DatanodeRegistration bpReg = getDNRegistrationForBP(block.getBlockPoolId());
if (!data.isValidBlock(block)) {
// block does not exist or is under-construction
boolean replicaNotExist = false;
boolean replicaStateNotFinalized = false;
boolean blockFileNotExist = false;
boolean lengthTooShort = false;
try {
data.checkBlock(block, block.getNumBytes(), ReplicaState.FINALIZED);
} catch (ReplicaNotFoundException e) {
replicaNotExist = true;
} catch (UnexpectedReplicaStateException e) {
replicaStateNotFinalized = true;
} catch (FileNotFoundException e) {
blockFileNotExist = true;
} catch (EOFException e) {
lengthTooShort = true;
} catch (IOException e) {
// The IOException indicates not being able to access block file,
// treat it the same here as blockFileNotExist, to trigger
// reporting it as a bad block
blockFileNotExist = true;
}
if (replicaNotExist || replicaStateNotFinalized) {
String errStr = "Can't send invalid block " + block;
LOG.info(errStr);
bpos.trySendErrorReport(DatanodeProtocol.INVALID_BLOCK, errStr);
return;
}
if (blockFileNotExist) {
// Report back to NN bad block caused by non-existent block file.
reportBadBlock(bpos, block, "Can't replicate block " + block
+ " because the block file doesn't exist, or is not accessible");
return;
}
if (lengthTooShort) {
// Check if NN recorded length matches on-disk length
long onDiskLength = data.getLength(block);
if (block.getNumBytes() > onDiskLength) {
FsVolumeSpi volume = getFSDataset().getVolume(block);
// Shorter on-disk len indicates corruption so report NN the corrupt block
bpos.reportBadBlocks(
block, volume.getStorageID(), volume.getStorageType());
LOG.warn("Can't replicate block " + block
+ " because on-disk length " + onDiskLength
reportBadBlock(bpos, block, "Can't replicate block " + block
+ " because on-disk length " + data.getLength(block)
+ " is shorter than NameNode recorded length " + block.getNumBytes());
return;
}

View File

@ -0,0 +1,45 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode;
import java.io.IOException;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
/**
* Exception indicating that the replica is in an unexpected state
*/
public class UnexpectedReplicaStateException extends IOException {
private static final long serialVersionUID = 1L;
public UnexpectedReplicaStateException() {
super();
}
public UnexpectedReplicaStateException(ExtendedBlock b,
ReplicaState expectedState) {
super("Replica " + b + " is not in expected state " + expectedState);
}
public UnexpectedReplicaStateException(String msg) {
super(msg);
}
}

View File

@ -18,8 +18,10 @@
package org.apache.hadoop.hdfs.server.datanode.fsdataset;
import java.io.EOFException;
import java.io.File;
import java.io.FileDescriptor;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.util.Collection;
@ -35,12 +37,15 @@
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataStorage;
import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
import org.apache.hadoop.hdfs.server.datanode.Replica;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipelineInterface;
import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.hdfs.server.datanode.UnexpectedReplicaStateException;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetFactory;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
@ -298,6 +303,29 @@ public String recoverClose(ExtendedBlock b, long newGS, long expectedBlockLen
/** Does the dataset contain the block? */
public boolean contains(ExtendedBlock block);
/**
* Check if a block is valid.
*
* @param b The block to check.
* @param minLength The minimum length that the block must have. May be 0.
* @param state If this is null, it is ignored. If it is non-null, we
* will check that the replica has this state.
*
* @throws ReplicaNotFoundException If the replica is not found
*
* @throws UnexpectedReplicaStateException If the replica is not in the
* expected state.
* @throws FileNotFoundException If the block file is not found or there
* was an error locating it.
* @throws EOFException If the replica length is too short.
*
* @throws IOException May be thrown from the methods called.
*/
public void checkBlock(ExtendedBlock b, long minLength, ReplicaState state)
throws ReplicaNotFoundException, UnexpectedReplicaStateException,
FileNotFoundException, EOFException, IOException;
/**
* Is the block valid?
* @return - true if the specified block is valid

View File

@ -19,6 +19,7 @@
import java.io.BufferedOutputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.File;
import java.io.FileDescriptor;
import java.io.FileInputStream;
@ -81,6 +82,7 @@
import org.apache.hadoop.hdfs.server.datanode.ReplicaUnderRecovery;
import org.apache.hadoop.hdfs.server.datanode.ReplicaWaitingToBeRecovered;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.hdfs.server.datanode.UnexpectedReplicaStateException;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
@ -1451,6 +1453,45 @@ public synchronized List<FinalizedReplica> getFinalizedBlocksOnPersistentStorage
return finalized;
}
/**
* Check if a block is valid.
*
* @param b The block to check.
* @param minLength The minimum length that the block must have. May be 0.
* @param state If this is null, it is ignored. If it is non-null, we
* will check that the replica has this state.
*
* @throws ReplicaNotFoundException If the replica is not found
*
* @throws UnexpectedReplicaStateException If the replica is not in the
* expected state.
* @throws FileNotFoundException If the block file is not found or there
* was an error locating it.
* @throws EOFException If the replica length is too short.
*
* @throws IOException May be thrown from the methods called.
*/
public void checkBlock(ExtendedBlock b, long minLength, ReplicaState state)
throws ReplicaNotFoundException, UnexpectedReplicaStateException,
FileNotFoundException, EOFException, IOException {
final ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),
b.getLocalBlock());
if (replicaInfo == null) {
throw new ReplicaNotFoundException(b);
}
if (replicaInfo.getState() != state) {
throw new UnexpectedReplicaStateException(b,state);
}
if (!replicaInfo.getBlockFile().exists()) {
throw new FileNotFoundException(replicaInfo.getBlockFile().getPath());
}
long onDiskLength = getLength(b);
if (onDiskLength < minLength) {
throw new EOFException(b + "'s on-disk length " + onDiskLength
+ " is shorter than minLength " + minLength);
}
}
/**
* Check whether the given block is a valid one.
* valid means finalized
@ -1470,11 +1511,12 @@ public boolean isValidRbw(final ExtendedBlock b) {
/** Does the block exist and have the given state? */
private boolean isValid(final ExtendedBlock b, final ReplicaState state) {
final ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),
b.getLocalBlock());
return replicaInfo != null
&& replicaInfo.getState() == state
&& replicaInfo.getBlockFile().exists();
try {
checkBlock(b, 0, state);
} catch (IOException e) {
return false;
}
return true;
}
/**

View File

@ -1825,21 +1825,38 @@ public synchronized void restartNameNode(int nnIndex, boolean waitActive,
}
}
private int corruptBlockOnDataNodesHelper(ExtendedBlock block,
boolean deleteBlockFile) throws IOException {
int blocksCorrupted = 0;
File[] blockFiles = getAllBlockFiles(block);
for (File f : blockFiles) {
if ((deleteBlockFile && corruptBlockByDeletingBlockFile(f)) ||
(!deleteBlockFile && corruptBlock(f))) {
blocksCorrupted++;
}
}
return blocksCorrupted;
}
/**
* Return the contents of the given block on the given datanode.
* Return the number of corrupted replicas of the given block.
*
* @param block block to be corrupted
* @throws IOException on error accessing the file for the given block
*/
public int corruptBlockOnDataNodes(ExtendedBlock block) throws IOException{
int blocksCorrupted = 0;
File[] blockFiles = getAllBlockFiles(block);
for (File f : blockFiles) {
if (corruptBlock(f)) {
blocksCorrupted++;
return corruptBlockOnDataNodesHelper(block, false);
}
}
return blocksCorrupted;
/**
* Return the number of corrupted replicas of the given block.
*
* @param block block to be corrupted
* @throws IOException on error accessing the file for the given block
*/
public int corruptBlockOnDataNodesByDeletingBlockFile(ExtendedBlock block)
throws IOException{
return corruptBlockOnDataNodesHelper(block, true);
}
public String readBlockOnDataNode(int i, ExtendedBlock block)
@ -1888,6 +1905,17 @@ public static boolean corruptBlock(File blockFile) throws IOException {
return true;
}
/*
* Corrupt a block on a particular datanode by deleting the block file
*/
public static boolean corruptBlockByDeletingBlockFile(File blockFile)
throws IOException {
if (blockFile == null || !blockFile.exists()) {
return false;
}
return blockFile.delete();
}
public static boolean changeGenStampOfBlock(int dnIndex, ExtendedBlock blk,
long newGenStamp) throws IOException {
File blockFile = getBlockFile(dnIndex, blk);

View File

@ -151,11 +151,8 @@ private void cleanupFile(FileSystem fileSys, Path name) throws IOException {
assertTrue(!fileSys.exists(name));
}
/*
* Test if Datanode reports bad blocks during replication request
*/
@Test
public void testBadBlockReportOnTransfer() throws Exception {
private void testBadBlockReportOnTransfer(
boolean corruptBlockByDeletingBlockFile) throws Exception {
Configuration conf = new HdfsConfiguration();
FileSystem fs = null;
DFSClient dfsClient = null;
@ -176,7 +173,11 @@ public void testBadBlockReportOnTransfer() throws Exception {
// Corrupt the block belonging to the created file
ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, file1);
int blockFilesCorrupted = cluster.corruptBlockOnDataNodes(block);
int blockFilesCorrupted =
corruptBlockByDeletingBlockFile?
cluster.corruptBlockOnDataNodesByDeletingBlockFile(block) :
cluster.corruptBlockOnDataNodes(block);
assertEquals("Corrupted too few blocks", replFactor, blockFilesCorrupted);
// Increase replication factor, this should invoke transfer request
@ -201,6 +202,23 @@ public void testBadBlockReportOnTransfer() throws Exception {
cluster.shutdown();
}
/*
* Test if Datanode reports bad blocks during replication request
*/
@Test
public void testBadBlockReportOnTransfer() throws Exception {
testBadBlockReportOnTransfer(false);
}
/*
* Test if Datanode reports bad blocks during replication request
* with missing block file
*/
@Test
public void testBadBlockReportOnTransferMissingBlockFile() throws Exception {
testBadBlockReportOnTransfer(true);
}
/**
* Tests replication in DFS.
*/

View File

@ -724,17 +724,52 @@ public boolean contains(ExtendedBlock block) {
return getBInfo(block) != null;
}
/**
* Check if a block is valid.
*
* @param b The block to check.
* @param minLength The minimum length that the block must have. May be 0.
* @param state If this is null, it is ignored. If it is non-null, we
* will check that the replica has this state.
*
* @throws ReplicaNotFoundException If the replica is not found
*
* @throws UnexpectedReplicaStateException If the replica is not in the
* expected state.
*/
@Override // {@link FsDatasetSpi}
public void checkBlock(ExtendedBlock b, long minLength, ReplicaState state)
throws ReplicaNotFoundException, UnexpectedReplicaStateException {
final BInfo binfo = getBInfo(b);
if (binfo == null) {
throw new ReplicaNotFoundException(b);
}
if ((state == ReplicaState.FINALIZED && !binfo.isFinalized()) ||
(state != ReplicaState.FINALIZED && binfo.isFinalized())) {
throw new UnexpectedReplicaStateException(b,state);
}
}
@Override // FsDatasetSpi
public synchronized boolean isValidBlock(ExtendedBlock b) {
final BInfo binfo = getBInfo(b);
return binfo != null && binfo.isFinalized();
try {
checkBlock(b, 0, ReplicaState.FINALIZED);
} catch (IOException e) {
return false;
}
return true;
}
/* check if a block is created but not finalized */
@Override
public synchronized boolean isValidRbw(ExtendedBlock b) {
final BInfo binfo = getBInfo(b);
return binfo != null && !binfo.isFinalized();
try {
checkBlock(b, 0, ReplicaState.RBW);
} catch (IOException e) {
return false;
}
return true;
}
@Override