HDFS-4072. Merging change 1399965 from trunk.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1399966 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Suresh Srinivas 2012-10-19 04:58:55 +00:00
parent c6f60985a8
commit 887e8a22d5
4 changed files with 122 additions and 21 deletions

View File

@ -137,6 +137,9 @@ Release 2.0.3-alpha - Unreleased
HDFS-4055. TestAuditLogs is flaky. (Binglin Chang via eli) HDFS-4055. TestAuditLogs is flaky. (Binglin Chang via eli)
HDFS-4072. On file deletion remove corresponding blocks pending
replications. (Jing Zhao via suresh)
Release 2.0.2-alpha - 2012-09-07 Release 2.0.2-alpha - 2012-09-07
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -289,7 +289,7 @@ public class BlockManager {
} }
private static BlockTokenSecretManager createBlockTokenSecretManager( private static BlockTokenSecretManager createBlockTokenSecretManager(
final Configuration conf) throws IOException { final Configuration conf) {
final boolean isEnabled = conf.getBoolean( final boolean isEnabled = conf.getBoolean(
DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY,
DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_DEFAULT); DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_DEFAULT);
@ -1261,7 +1261,7 @@ public class BlockManager {
// Move the block-replication into a "pending" state. // Move the block-replication into a "pending" state.
// The reason we use 'pending' is so we can retry // The reason we use 'pending' is so we can retry
// replications that fail after an appropriate amount of time. // replications that fail after an appropriate amount of time.
pendingReplications.add(block, targets.length); pendingReplications.increment(block, targets.length);
if(NameNode.stateChangeLog.isDebugEnabled()) { if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug( NameNode.stateChangeLog.debug(
"BLOCK* block " + block "BLOCK* block " + block
@ -1307,8 +1307,11 @@ public class BlockManager {
/** /**
* Choose target datanodes according to the replication policy. * Choose target datanodes according to the replication policy.
* @throws IOException if the number of targets < minimum replication. *
* @see BlockPlacementPolicy#chooseTarget(String, int, DatanodeDescriptor, HashMap, long) * @throws IOException
* if the number of targets < minimum replication.
* @see BlockPlacementPolicy#chooseTarget(String, int, DatanodeDescriptor,
* List, boolean, HashMap, long)
*/ */
public DatanodeDescriptor[] chooseTarget(final String src, public DatanodeDescriptor[] chooseTarget(final String src,
final int numOfReplicas, final DatanodeDescriptor client, final int numOfReplicas, final DatanodeDescriptor client,
@ -1812,7 +1815,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
/** /**
* Queue the given reported block for later processing in the * Queue the given reported block for later processing in the
* standby node. {@see PendingDataNodeMessages}. * standby node. @see PendingDataNodeMessages.
* @param reason a textual reason to report in the debug logs * @param reason a textual reason to report in the debug logs
*/ */
private void queueReportedBlock(DatanodeDescriptor dn, Block block, private void queueReportedBlock(DatanodeDescriptor dn, Block block,
@ -1977,14 +1980,15 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
} }
/** /**
* Faster version of {@link addStoredBlock()}, intended for use with * Faster version of
* initial block report at startup. If not in startup safe mode, will * {@link #addStoredBlock(BlockInfo, DatanodeDescriptor, DatanodeDescriptor, boolean)}
* call standard addStoredBlock(). * , intended for use with initial block report at startup. If not in startup
* Assumes this method is called "immediately" so there is no need to * safe mode, will call standard addStoredBlock(). Assumes this method is
* refresh the storedBlock from blocksMap. * called "immediately" so there is no need to refresh the storedBlock from
* Doesn't handle underReplication/overReplication, or worry about * blocksMap. Doesn't handle underReplication/overReplication, or worry about
* pendingReplications or corruptReplicas, because it's in startup safe mode. * pendingReplications or corruptReplicas, because it's in startup safe mode.
* Doesn't log every block, because there are typically millions of them. * Doesn't log every block, because there are typically millions of them.
*
* @throws IOException * @throws IOException
*/ */
private void addStoredBlockImmediate(BlockInfo storedBlock, private void addStoredBlockImmediate(BlockInfo storedBlock,
@ -2534,7 +2538,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
// //
// Modify the blocks->datanode map and node's map. // Modify the blocks->datanode map and node's map.
// //
pendingReplications.remove(block); pendingReplications.decrement(block);
processAndHandleReportedBlock(node, block, ReplicaState.FINALIZED, processAndHandleReportedBlock(node, block, ReplicaState.FINALIZED,
delHintNode); delHintNode);
} }
@ -2670,7 +2674,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
} }
/** /**
* Simpler, faster form of {@link countNodes()} that only returns the number * Simpler, faster form of {@link #countNodes(Block)} that only returns the number
* of live nodes. If in startup safemode (or its 30-sec extension period), * of live nodes. If in startup safemode (or its 30-sec extension period),
* then it gains speed by ignoring issues of excess replicas or nodes * then it gains speed by ignoring issues of excess replicas or nodes
* that are decommissioned or in process of becoming decommissioned. * that are decommissioned or in process of becoming decommissioned.
@ -2819,6 +2823,8 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
addToInvalidates(block); addToInvalidates(block);
corruptReplicas.removeFromCorruptReplicasMap(block); corruptReplicas.removeFromCorruptReplicasMap(block);
blocksMap.removeBlock(block); blocksMap.removeBlock(block);
// Remove the block from pendingReplications
pendingReplications.remove(block);
if (postponedMisreplicatedBlocks.remove(block)) { if (postponedMisreplicatedBlocks.remove(block)) {
postponedMisreplicatedBlocksCount--; postponedMisreplicatedBlocksCount--;
} }

View File

@ -72,7 +72,7 @@ class PendingReplicationBlocks {
/** /**
* Add a block to the list of pending Replications * Add a block to the list of pending Replications
*/ */
void add(Block block, int numReplicas) { void increment(Block block, int numReplicas) {
synchronized (pendingReplications) { synchronized (pendingReplications) {
PendingBlockInfo found = pendingReplications.get(block); PendingBlockInfo found = pendingReplications.get(block);
if (found == null) { if (found == null) {
@ -89,7 +89,7 @@ class PendingReplicationBlocks {
* Decrement the number of pending replication requests * Decrement the number of pending replication requests
* for this block. * for this block.
*/ */
void remove(Block block) { void decrement(Block block) {
synchronized (pendingReplications) { synchronized (pendingReplications) {
PendingBlockInfo found = pendingReplications.get(block); PendingBlockInfo found = pendingReplications.get(block);
if (found != null) { if (found != null) {
@ -104,6 +104,16 @@ class PendingReplicationBlocks {
} }
} }
/**
* Remove the record about the given block from pendingReplications.
* @param block The given block whose pending replication requests need to be
* removed
*/
void remove(Block block) {
synchronized (pendingReplications) {
pendingReplications.remove(block);
}
}
public void clear() { public void clear() {
synchronized (pendingReplications) { synchronized (pendingReplications) {

View File

@ -20,14 +20,30 @@ package org.apache.hadoop.hdfs.server.blockmanagement;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.junit.Test; import org.junit.Test;
/** /**
* This class tests the internals of PendingReplicationBlocks.java * This class tests the internals of PendingReplicationBlocks.java,
* as well as how PendingReplicationBlocks acts in BlockManager
*/ */
public class TestPendingReplication { public class TestPendingReplication {
final static int TIMEOUT = 3; // 3 seconds final static int TIMEOUT = 3; // 3 seconds
private static final int DFS_REPLICATION_INTERVAL = 1;
// Number of datanodes in the cluster
private static final int DATANODE_COUNT = 5;
@Test @Test
public void testPendingReplication() { public void testPendingReplication() {
@ -40,7 +56,7 @@ public class TestPendingReplication {
// //
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
Block block = new Block(i, i, 0); Block block = new Block(i, i, 0);
pendingReplications.add(block, i); pendingReplications.increment(block, i);
} }
assertEquals("Size of pendingReplications ", assertEquals("Size of pendingReplications ",
10, pendingReplications.size()); 10, pendingReplications.size());
@ -50,15 +66,15 @@ public class TestPendingReplication {
// remove one item and reinsert it // remove one item and reinsert it
// //
Block blk = new Block(8, 8, 0); Block blk = new Block(8, 8, 0);
pendingReplications.remove(blk); // removes one replica pendingReplications.decrement(blk); // removes one replica
assertEquals("pendingReplications.getNumReplicas ", assertEquals("pendingReplications.getNumReplicas ",
7, pendingReplications.getNumReplicas(blk)); 7, pendingReplications.getNumReplicas(blk));
for (int i = 0; i < 7; i++) { for (int i = 0; i < 7; i++) {
pendingReplications.remove(blk); // removes all replicas pendingReplications.decrement(blk); // removes all replicas
} }
assertTrue(pendingReplications.size() == 9); assertTrue(pendingReplications.size() == 9);
pendingReplications.add(blk, 8); pendingReplications.increment(blk, 8);
assertTrue(pendingReplications.size() == 10); assertTrue(pendingReplications.size() == 10);
// //
@ -86,7 +102,7 @@ public class TestPendingReplication {
for (int i = 10; i < 15; i++) { for (int i = 10; i < 15; i++) {
Block block = new Block(i, i, 0); Block block = new Block(i, i, 0);
pendingReplications.add(block, i); pendingReplications.increment(block, i);
} }
assertTrue(pendingReplications.size() == 15); assertTrue(pendingReplications.size() == 15);
@ -116,4 +132,70 @@ public class TestPendingReplication {
} }
pendingReplications.stop(); pendingReplications.stop();
} }
/**
* Test if BlockManager can correctly remove corresponding pending records
* when a file is deleted
*
* @throws Exception
*/
@Test
public void testPendingAndInvalidate() throws Exception {
final Configuration CONF = new HdfsConfiguration();
CONF.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 1024);
CONF.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
DFS_REPLICATION_INTERVAL);
CONF.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY,
DFS_REPLICATION_INTERVAL);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(CONF).numDataNodes(
DATANODE_COUNT).build();
cluster.waitActive();
FSNamesystem namesystem = cluster.getNamesystem();
BlockManager bm = namesystem.getBlockManager();
DistributedFileSystem fs = cluster.getFileSystem();
try {
// 1. create a file
Path filePath = new Path("/tmp.txt");
DFSTestUtil.createFile(fs, filePath, 1024, (short) 3, 0L);
// 2. disable the heartbeats
for (DataNode dn : cluster.getDataNodes()) {
DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true);
}
// 3. mark a couple of blocks as corrupt
LocatedBlock block = NameNodeAdapter.getBlockLocations(
cluster.getNameNode(), filePath.toString(), 0, 1).get(0);
cluster.getNamesystem().writeLock();
try {
bm.findAndMarkBlockAsCorrupt(block.getBlock(), block.getLocations()[0],
"TEST");
bm.findAndMarkBlockAsCorrupt(block.getBlock(), block.getLocations()[1],
"TEST");
} finally {
cluster.getNamesystem().writeUnlock();
}
BlockManagerTestUtil.computeAllPendingWork(bm);
BlockManagerTestUtil.updateState(bm);
assertEquals(bm.getPendingReplicationBlocksCount(), 1L);
assertEquals(bm.pendingReplications.getNumReplicas(block.getBlock()
.getLocalBlock()), 2);
// 4. delete the file
fs.delete(filePath, true);
// retry at most 10 times, each time sleep for 1s. Note that 10s is much
// less than the default pending record timeout (5~10min)
int retries = 10;
long pendingNum = bm.getPendingReplicationBlocksCount();
while (pendingNum != 0 && retries-- > 0) {
Thread.sleep(1000); // let NN do the deletion
BlockManagerTestUtil.updateState(bm);
pendingNum = bm.getPendingReplicationBlocksCount();
}
assertEquals(pendingNum, 0L);
} finally {
cluster.shutdown();
}
}
} }