HDFS-5437. Fix TestBlockReport and TestBPOfferService failures.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-2832@1537365 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Arpit Agarwal 2013-10-31 02:32:25 +00:00
parent 4eebf56ccf
commit 2af5083a35
6 changed files with 86 additions and 163 deletions

View File

@ -54,3 +54,6 @@ IMPROVEMENTS:
HDFS-5435. File append fails to initialize storageIDs. (Junping Du via HDFS-5435. File append fails to initialize storageIDs. (Junping Du via
Arpit Agarwal) Arpit Agarwal)
HDFS-5437. Fix TestBlockReport and TestBPOfferService failures. (Arpit
Agarwal)

View File

@ -19,7 +19,9 @@ package org.apache.hadoop.hdfs.protocol;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Random;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
@ -250,33 +252,28 @@ public class BlockListAsLongs implements Iterable<Block> {
} }
/** /**
* The block-id of the indexTh block * Corrupt the generation stamp of the block with the given index.
* @param index - the block whose block-id is desired * Not meant to be used outside of tests.
* @return the block-id
*/ */
@Deprecated @VisibleForTesting
public long getBlockId(final int index) { public long corruptBlockGSForTesting(final int blockIndex, Random rand) {
return blockId(index); long oldGS = blockList[index2BlockId(blockIndex) + 2];
} while (blockList[index2BlockId(blockIndex) + 2] == oldGS) {
blockList[index2BlockId(blockIndex) + 2] = rand.nextInt();
/** }
* The block-len of the indexTh block return oldGS;
* @param index - the block whose block-len is desired
* @return - the block-len
*/
@Deprecated
public long getBlockLen(final int index) {
return blockLength(index);
} }
/** /**
* The generation stamp of the indexTh block * Corrupt the length of the block with the given index by truncation.
* @param index - the block whose block-len is desired * Not meant to be used outside of tests.
* @return - the generation stamp
*/ */
@Deprecated @VisibleForTesting
public long getBlockGenStamp(final int index) { public long corruptBlockLengthForTesting(final int blockIndex, Random rand) {
return blockGenerationStamp(index); long oldLength = blockList[index2BlockId(blockIndex) + 1];
blockList[index2BlockId(blockIndex) + 1] =
rand.nextInt((int) oldLength - 1);
return oldLength;
} }
/** /**

View File

@ -60,11 +60,7 @@ public class LocatedBlock {
private static final DatanodeInfo[] EMPTY_LOCS = new DatanodeInfo[0]; private static final DatanodeInfo[] EMPTY_LOCS = new DatanodeInfo[0];
public LocatedBlock(ExtendedBlock b, DatanodeInfo[] locs) { public LocatedBlock(ExtendedBlock b, DatanodeInfo[] locs) {
this(b, locs, -1); // startOffset is unknown this(b, locs, -1, false); // startOffset is unknown
}
public LocatedBlock(ExtendedBlock b, DatanodeInfo[] locs, long startOffset) {
this(b, locs, startOffset, false);
} }
public LocatedBlock(ExtendedBlock b, DatanodeInfo[] locs, long startOffset, public LocatedBlock(ExtendedBlock b, DatanodeInfo[] locs, long startOffset,
@ -76,6 +72,11 @@ public class LocatedBlock {
this(b, storages, -1, false); // startOffset is unknown this(b, storages, -1, false); // startOffset is unknown
} }
public LocatedBlock(ExtendedBlock b, DatanodeInfo[] locs,
String[] storageIDs, StorageType[] storageTypes) {
this(b, locs, storageIDs, storageTypes, -1, false, EMPTY_LOCS);
}
public LocatedBlock(ExtendedBlock b, DatanodeStorageInfo[] storages, public LocatedBlock(ExtendedBlock b, DatanodeStorageInfo[] storages,
long startOffset, boolean corrupt) { long startOffset, boolean corrupt) {
this(b, DatanodeStorageInfo.toDatanodeInfos(storages), this(b, DatanodeStorageInfo.toDatanodeInfos(storages),

View File

@ -252,7 +252,7 @@ class BPServiceActor implements Runnable {
// TODO: Corrupt flag is set to false for compatibility. We can probably // TODO: Corrupt flag is set to false for compatibility. We can probably
// set it to true here. // set it to true here.
LocatedBlock[] blocks = { LocatedBlock[] blocks = {
new LocatedBlock(block, dnArr, uuids, types, -1, false, null) }; new LocatedBlock(block, dnArr, uuids, types) };
try { try {
bpNamenode.reportBadBlocks(blocks); bpNamenode.reportBadBlocks(blocks);

View File

@ -1029,7 +1029,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
@Override @Override
public StorageReport[] getStorageReports(String bpid) { public StorageReport[] getStorageReports(String bpid) {
throw new UnsupportedOperationException(); return new StorageReport[0];
} }
@Override @Override

View File

@ -116,52 +116,45 @@ public class TestBlockReport {
cluster.shutdown(); cluster.shutdown();
} }
private static StorageBlockReport[] getBlockReports(DataNode dn, String bpid) { // Generate a block report, optionally corrupting the generation
// stamp and/or length of one block.
private static StorageBlockReport[] getBlockReports(
DataNode dn, String bpid, boolean corruptOneBlockGs,
boolean corruptOneBlockLen) {
Map<String, BlockListAsLongs> perVolumeBlockLists = Map<String, BlockListAsLongs> perVolumeBlockLists =
dn.getFSDataset().getBlockReports(bpid); dn.getFSDataset().getBlockReports(bpid);
// Send block report // Send block report
StorageBlockReport[] reports = StorageBlockReport[] reports =
new StorageBlockReport[perVolumeBlockLists.size()]; new StorageBlockReport[perVolumeBlockLists.size()];
boolean corruptedGs = false;
boolean corruptedLen = false;
int i = 0; int reportIndex = 0;
for(Map.Entry<String, BlockListAsLongs> kvPair : perVolumeBlockLists.entrySet()) { for(Map.Entry<String, BlockListAsLongs> kvPair : perVolumeBlockLists.entrySet()) {
String storageID = kvPair.getKey(); String storageID = kvPair.getKey();
long[] blockList = kvPair.getValue().getBlockListAsLongs(); BlockListAsLongs blockList = kvPair.getValue();
// Dummy DatanodeStorage object just for sending the block report. // Walk the list of blocks until we find one each to corrupt the
DatanodeStorage dnStorage = new DatanodeStorage(storageID); // generation stamp and length, if so requested.
reports[i++] = new StorageBlockReport(dnStorage, blockList); for (int i = 0; i < blockList.getNumberOfBlocks(); ++i) {
} if (corruptOneBlockGs && !corruptedGs) {
blockList.corruptBlockGSForTesting(i, rand);
return reports; LOG.info("Corrupted the GS for block ID " + i);
} corruptedGs = true;
} else if (corruptOneBlockLen && !corruptedLen) {
// Get block reports but modify the GS of one of the blocks. blockList.corruptBlockLengthForTesting(i, rand);
private static StorageBlockReport[] getBlockReportsCorruptSingleBlockGS( LOG.info("Corrupted the length for block ID " + i);
DataNode dn, String bpid) { corruptedLen = true;
Map<String, BlockListAsLongs> perVolumeBlockLists = } else {
dn.getFSDataset().getBlockReports(bpid); break;
}
// Send block report
StorageBlockReport[] reports =
new StorageBlockReport[perVolumeBlockLists.size()];
boolean corruptedBlock = false;
int i = 0;
for(Map.Entry<String, BlockListAsLongs> kvPair : perVolumeBlockLists.entrySet()) {
String storageID = kvPair.getKey();
long[] blockList = kvPair.getValue().getBlockListAsLongs();
if (!corruptedBlock) {
blockList[4] = rand.nextInt(); // Bad GS.
corruptedBlock = true;
} }
// Dummy DatanodeStorage object just for sending the block report. // Dummy DatanodeStorage object just for sending the block report.
DatanodeStorage dnStorage = new DatanodeStorage(storageID); DatanodeStorage dnStorage = new DatanodeStorage(storageID);
reports[i++] = new StorageBlockReport(dnStorage, blockList); reports[reportIndex++] =
new StorageBlockReport(dnStorage, blockList.getBlockListAsLongs());
} }
return reports; return reports;
@ -207,7 +200,7 @@ public class TestBlockReport {
DataNode dn = cluster.getDataNodes().get(DN_N0); DataNode dn = cluster.getDataNodes().get(DN_N0);
String poolId = cluster.getNamesystem().getBlockPoolId(); String poolId = cluster.getNamesystem().getBlockPoolId();
DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId); DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
StorageBlockReport[] reports = getBlockReports(dn, poolId); StorageBlockReport[] reports = getBlockReports(dn, poolId, false, false);
cluster.getNameNodeRpc().blockReport(dnR, poolId, reports); cluster.getNameNodeRpc().blockReport(dnR, poolId, reports);
List<LocatedBlock> blocksAfterReport = List<LocatedBlock> blocksAfterReport =
@ -289,7 +282,7 @@ public class TestBlockReport {
// all blocks belong to the same file, hence same BP // all blocks belong to the same file, hence same BP
String poolId = cluster.getNamesystem().getBlockPoolId(); String poolId = cluster.getNamesystem().getBlockPoolId();
DatanodeRegistration dnR = dn0.getDNRegistrationForBP(poolId); DatanodeRegistration dnR = dn0.getDNRegistrationForBP(poolId);
StorageBlockReport[] reports = getBlockReports(dn0, poolId); StorageBlockReport[] reports = getBlockReports(dn0, poolId, false, false);
cluster.getNameNodeRpc().blockReport(dnR, poolId, reports); cluster.getNameNodeRpc().blockReport(dnR, poolId, reports);
BlockManagerTestUtil.getComputedDatanodeWork(cluster.getNamesystem() BlockManagerTestUtil.getComputedDatanodeWork(cluster.getNamesystem()
@ -315,14 +308,13 @@ public class TestBlockReport {
public void blockReport_03() throws IOException { public void blockReport_03() throws IOException {
final String METHOD_NAME = GenericTestUtils.getMethodName(); final String METHOD_NAME = GenericTestUtils.getMethodName();
Path filePath = new Path("/" + METHOD_NAME + ".dat"); Path filePath = new Path("/" + METHOD_NAME + ".dat");
DFSTestUtil.createFile(fs, filePath, ArrayList<Block> blocks = writeFile(METHOD_NAME, FILE_SIZE, filePath);
FILE_SIZE, REPL_FACTOR, rand.nextLong());
// all blocks belong to the same file, hence same BP // all blocks belong to the same file, hence same BP
DataNode dn = cluster.getDataNodes().get(DN_N0); DataNode dn = cluster.getDataNodes().get(DN_N0);
String poolId = cluster.getNamesystem().getBlockPoolId(); String poolId = cluster.getNamesystem().getBlockPoolId();
DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId); DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
StorageBlockReport[] reports = getBlockReportsCorruptSingleBlockGS(dn, poolId); StorageBlockReport[] reports = getBlockReports(dn, poolId, true, false);
DatanodeCommand dnCmd = DatanodeCommand dnCmd =
cluster.getNameNodeRpc().blockReport(dnR, poolId, reports); cluster.getNameNodeRpc().blockReport(dnR, poolId, reports);
if(LOG.isDebugEnabled()) { if(LOG.isDebugEnabled()) {
@ -362,7 +354,7 @@ public class TestBlockReport {
dn.getFSDataset().createRbw(b); dn.getFSDataset().createRbw(b);
DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId); DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
StorageBlockReport[] reports = getBlockReports(dn, poolId); StorageBlockReport[] reports = getBlockReports(dn, poolId, false, false);
DatanodeCommand dnCmd = DatanodeCommand dnCmd =
cluster.getNameNodeRpc().blockReport(dnR, poolId, reports); cluster.getNameNodeRpc().blockReport(dnR, poolId, reports);
if(LOG.isDebugEnabled()) { if(LOG.isDebugEnabled()) {
@ -376,15 +368,6 @@ public class TestBlockReport {
cluster.getNamesystem().getPendingDeletionBlocks(), is(1L)); cluster.getNamesystem().getPendingDeletionBlocks(), is(1L));
} }
// Client requests new block from NN. The test corrupts this very block
// and forces new block report.
// The test case isn't specific for BlockReport because it relies on
// BlockScanner which is out of scope of this test
// Keeping the name to be in synch with the test plan
//
public void blockReport_05() {
}
/** /**
* Test creates a file and closes it. * Test creates a file and closes it.
* The second datanode is started in the cluster. * The second datanode is started in the cluster.
@ -399,14 +382,14 @@ public class TestBlockReport {
Path filePath = new Path("/" + METHOD_NAME + ".dat"); Path filePath = new Path("/" + METHOD_NAME + ".dat");
final int DN_N1 = DN_N0 + 1; final int DN_N1 = DN_N0 + 1;
ArrayList<Block> blocks = writeFile(METHOD_NAME, FILE_SIZE, filePath); writeFile(METHOD_NAME, FILE_SIZE, filePath);
startDNandWait(filePath, true); startDNandWait(filePath, true);
// all blocks belong to the same file, hence same BP // all blocks belong to the same file, hence same BP
DataNode dn = cluster.getDataNodes().get(DN_N1); DataNode dn = cluster.getDataNodes().get(DN_N1);
String poolId = cluster.getNamesystem().getBlockPoolId(); String poolId = cluster.getNamesystem().getBlockPoolId();
DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId); DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
StorageBlockReport[] reports = getBlockReports(dn, poolId); StorageBlockReport[] reports = getBlockReports(dn, poolId, false, false);
cluster.getNameNodeRpc().blockReport(dnR, poolId, reports); cluster.getNameNodeRpc().blockReport(dnR, poolId, reports);
printStats(); printStats();
assertEquals("Wrong number of PendingReplication Blocks", assertEquals("Wrong number of PendingReplication Blocks",
@ -427,66 +410,40 @@ public class TestBlockReport {
* @throws IOException in case of an error * @throws IOException in case of an error
*/ */
@Test @Test
// Currently this test is failing as expected 'cause the correct behavior is
// not yet implemented (9/15/09)
public void blockReport_07() throws Exception { public void blockReport_07() throws Exception {
final String METHOD_NAME = GenericTestUtils.getMethodName(); final String METHOD_NAME = GenericTestUtils.getMethodName();
Path filePath = new Path("/" + METHOD_NAME + ".dat"); Path filePath = new Path("/" + METHOD_NAME + ".dat");
final int DN_N1 = DN_N0 + 1; final int DN_N1 = DN_N0 + 1;
// write file and start second node to be "older" than the original // write file and start second node to be "older" than the original
ArrayList<Block> blocks = writeFile(METHOD_NAME, FILE_SIZE, filePath); writeFile(METHOD_NAME, FILE_SIZE, filePath);
startDNandWait(filePath, true); startDNandWait(filePath, true);
int randIndex = rand.nextInt(blocks.size());
// Get a block and screw its GS
Block corruptedBlock = blocks.get(randIndex);
String secondNode = cluster.getDataNodes().get(DN_N1).getDatanodeId().getDatanodeUuid();
if(LOG.isDebugEnabled()) {
LOG.debug("Working with " + secondNode);
LOG.debug("BlockGS before " + blocks.get(randIndex).getGenerationStamp());
}
corruptBlockGS(corruptedBlock);
if(LOG.isDebugEnabled()) {
LOG.debug("BlockGS after " + blocks.get(randIndex).getGenerationStamp());
LOG.debug("Done corrupting GS of " + corruptedBlock.getBlockName());
}
// all blocks belong to the same file, hence same BP // all blocks belong to the same file, hence same BP
DataNode dn = cluster.getDataNodes().get(DN_N1); DataNode dn = cluster.getDataNodes().get(DN_N1);
String poolId = cluster.getNamesystem().getBlockPoolId(); String poolId = cluster.getNamesystem().getBlockPoolId();
DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId); DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
StorageBlockReport[] report = getBlockReports(dn, poolId); StorageBlockReport[] reports = getBlockReports(dn, poolId, true, false);
cluster.getNameNodeRpc().blockReport(dnR, poolId, report); cluster.getNameNodeRpc().blockReport(dnR, poolId, reports);
printStats();
assertEquals("Wrong number of Corrupted blocks",
1, cluster.getNamesystem().getCorruptReplicaBlocks() +
// the following might have to be added into the equation if
// the same block could be in two different states at the same time
// and then the expected number of has to be changed to '2'
// cluster.getNamesystem().getPendingReplicationBlocks() +
cluster.getNamesystem().getPendingDeletionBlocks());
// Get another block and screw its length to be less than original
if (randIndex == 0)
randIndex++;
else
randIndex--;
corruptedBlock = blocks.get(randIndex);
corruptBlockLen(corruptedBlock);
if(LOG.isDebugEnabled()) {
LOG.debug("Done corrupting length of " + corruptedBlock.getBlockName());
}
report[0] = new StorageBlockReport(
report[0].getStorage(),
new BlockListAsLongs(blocks, null).getBlockListAsLongs());
cluster.getNameNodeRpc().blockReport(dnR, poolId, report);
printStats(); printStats();
assertEquals("Wrong number of Corrupted blocks", assertThat("Wrong number of corrupt blocks",
2, cluster.getNamesystem().getCorruptReplicaBlocks() + cluster.getNamesystem().getCorruptReplicaBlocks(), is(1L));
cluster.getNamesystem().getPendingReplicationBlocks() + assertThat("Wrong number of PendingDeletion blocks",
cluster.getNamesystem().getPendingDeletionBlocks()); cluster.getNamesystem().getPendingDeletionBlocks(), is(0L));
assertThat("Wrong number of PendingReplication blocks",
cluster.getNamesystem().getPendingReplicationBlocks(), is(0L));
reports = getBlockReports(dn, poolId, true, true);
cluster.getNameNodeRpc().blockReport(dnR, poolId, reports);
printStats();
assertThat("Wrong number of corrupt blocks",
cluster.getNamesystem().getCorruptReplicaBlocks(), is(2L));
assertThat("Wrong number of PendingDeletion blocks",
cluster.getNamesystem().getPendingDeletionBlocks(), is(0L));
assertThat("Wrong number of PendingReplication blocks",
cluster.getNamesystem().getPendingReplicationBlocks(), is(0L));
printStats(); printStats();
@ -529,7 +486,7 @@ public class TestBlockReport {
DataNode dn = cluster.getDataNodes().get(DN_N1); DataNode dn = cluster.getDataNodes().get(DN_N1);
String poolId = cluster.getNamesystem().getBlockPoolId(); String poolId = cluster.getNamesystem().getBlockPoolId();
DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId); DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
StorageBlockReport[] report = getBlockReports(dn, poolId); StorageBlockReport[] report = getBlockReports(dn, poolId, false, false);
cluster.getNameNodeRpc().blockReport(dnR, poolId, report); cluster.getNameNodeRpc().blockReport(dnR, poolId, report);
printStats(); printStats();
assertEquals("Wrong number of PendingReplication blocks", assertEquals("Wrong number of PendingReplication blocks",
@ -560,14 +517,11 @@ public class TestBlockReport {
// write file and start second node to be "older" than the original // write file and start second node to be "older" than the original
try { try {
ArrayList<Block> blocks = writeFile(METHOD_NAME, 12 * bytesChkSum, filePath);
writeFile(METHOD_NAME, 12 * bytesChkSum, filePath);
Block bl = findBlock(filePath, 12 * bytesChkSum); Block bl = findBlock(filePath, 12 * bytesChkSum);
BlockChecker bc = new BlockChecker(filePath); BlockChecker bc = new BlockChecker(filePath);
bc.start(); bc.start();
corruptBlockGS(bl);
corruptBlockLen(bl);
waitForTempReplica(bl, DN_N1); waitForTempReplica(bl, DN_N1);
@ -575,7 +529,7 @@ public class TestBlockReport {
DataNode dn = cluster.getDataNodes().get(DN_N1); DataNode dn = cluster.getDataNodes().get(DN_N1);
String poolId = cluster.getNamesystem().getBlockPoolId(); String poolId = cluster.getNamesystem().getBlockPoolId();
DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId); DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
StorageBlockReport[] report = getBlockReports(dn, poolId); StorageBlockReport[] report = getBlockReports(dn, poolId, true, true);
cluster.getNameNodeRpc().blockReport(dnR, poolId, report); cluster.getNameNodeRpc().blockReport(dnR, poolId, report);
printStats(); printStats();
assertEquals("Wrong number of PendingReplication blocks", assertEquals("Wrong number of PendingReplication blocks",
@ -851,38 +805,6 @@ public class TestBlockReport {
((Log4JLogger) TestBlockReport.LOG).getLogger().setLevel(Level.ALL); ((Log4JLogger) TestBlockReport.LOG).getLogger().setLevel(Level.ALL);
} }
private void corruptBlockLen(final Block block)
throws IOException {
if (block == null) {
throw new IOException("Block isn't suppose to be null");
}
long oldLen = block.getNumBytes();
long newLen = oldLen - rand.nextLong();
assertTrue("Old and new length shouldn't be the same",
block.getNumBytes() != newLen);
block.setNumBytes(newLen);
if(LOG.isDebugEnabled()) {
LOG.debug("Length of " + block.getBlockName() +
" is changed to " + newLen + " from " + oldLen);
}
}
private void corruptBlockGS(final Block block)
throws IOException {
if (block == null) {
throw new IOException("Block isn't suppose to be null");
}
long oldGS = block.getGenerationStamp();
long newGS = oldGS - rand.nextLong();
assertTrue("Old and new GS shouldn't be the same",
block.getGenerationStamp() != newGS);
block.setGenerationStamp(newGS);
if(LOG.isDebugEnabled()) {
LOG.debug("Generation stamp of " + block.getBlockName() +
" is changed to " + block.getGenerationStamp() + " from " + oldGS);
}
}
private Block findBlock(Path path, long size) throws IOException { private Block findBlock(Path path, long size) throws IOException {
Block ret; Block ret;
List<LocatedBlock> lbs = List<LocatedBlock> lbs =