HDFS-4072. On file deletion remove corresponding blocks pending replications. Contributed by Jing Zhao.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1399965 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
adcf7d10b9
commit
0603447f2a
|
@ -474,6 +474,9 @@ Release 2.0.3-alpha - Unreleased
|
||||||
|
|
||||||
HDFS-4055. TestAuditLogs is flaky. (Binglin Chang via eli)
|
HDFS-4055. TestAuditLogs is flaky. (Binglin Chang via eli)
|
||||||
|
|
||||||
|
HDFS-4072. On file deletion remove corresponding blocks pending
|
||||||
|
replications. (Jing Zhao via suresh)
|
||||||
|
|
||||||
Release 2.0.2-alpha - 2012-09-07
|
Release 2.0.2-alpha - 2012-09-07
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -288,7 +288,7 @@ public class BlockManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
private static BlockTokenSecretManager createBlockTokenSecretManager(
|
private static BlockTokenSecretManager createBlockTokenSecretManager(
|
||||||
final Configuration conf) throws IOException {
|
final Configuration conf) {
|
||||||
final boolean isEnabled = conf.getBoolean(
|
final boolean isEnabled = conf.getBoolean(
|
||||||
DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY,
|
DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY,
|
||||||
DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_DEFAULT);
|
DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_DEFAULT);
|
||||||
|
@ -1260,7 +1260,7 @@ public class BlockManager {
|
||||||
// Move the block-replication into a "pending" state.
|
// Move the block-replication into a "pending" state.
|
||||||
// The reason we use 'pending' is so we can retry
|
// The reason we use 'pending' is so we can retry
|
||||||
// replications that fail after an appropriate amount of time.
|
// replications that fail after an appropriate amount of time.
|
||||||
pendingReplications.add(block, targets.length);
|
pendingReplications.increment(block, targets.length);
|
||||||
if(NameNode.stateChangeLog.isDebugEnabled()) {
|
if(NameNode.stateChangeLog.isDebugEnabled()) {
|
||||||
NameNode.stateChangeLog.debug(
|
NameNode.stateChangeLog.debug(
|
||||||
"BLOCK* block " + block
|
"BLOCK* block " + block
|
||||||
|
@ -1306,8 +1306,11 @@ public class BlockManager {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Choose target datanodes according to the replication policy.
|
* Choose target datanodes according to the replication policy.
|
||||||
* @throws IOException if the number of targets < minimum replication.
|
*
|
||||||
* @see BlockPlacementPolicy#chooseTarget(String, int, DatanodeDescriptor, HashMap, long)
|
* @throws IOException
|
||||||
|
* if the number of targets < minimum replication.
|
||||||
|
* @see BlockPlacementPolicy#chooseTarget(String, int, DatanodeDescriptor,
|
||||||
|
* List, boolean, HashMap, long)
|
||||||
*/
|
*/
|
||||||
public DatanodeDescriptor[] chooseTarget(final String src,
|
public DatanodeDescriptor[] chooseTarget(final String src,
|
||||||
final int numOfReplicas, final DatanodeDescriptor client,
|
final int numOfReplicas, final DatanodeDescriptor client,
|
||||||
|
@ -1811,7 +1814,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Queue the given reported block for later processing in the
|
* Queue the given reported block for later processing in the
|
||||||
* standby node. {@see PendingDataNodeMessages}.
|
* standby node. @see PendingDataNodeMessages.
|
||||||
* @param reason a textual reason to report in the debug logs
|
* @param reason a textual reason to report in the debug logs
|
||||||
*/
|
*/
|
||||||
private void queueReportedBlock(DatanodeDescriptor dn, Block block,
|
private void queueReportedBlock(DatanodeDescriptor dn, Block block,
|
||||||
|
@ -1976,14 +1979,15 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Faster version of {@link addStoredBlock()}, intended for use with
|
* Faster version of
|
||||||
* initial block report at startup. If not in startup safe mode, will
|
* {@link #addStoredBlock(BlockInfo, DatanodeDescriptor, DatanodeDescriptor, boolean)}
|
||||||
* call standard addStoredBlock().
|
* , intended for use with initial block report at startup. If not in startup
|
||||||
* Assumes this method is called "immediately" so there is no need to
|
* safe mode, will call standard addStoredBlock(). Assumes this method is
|
||||||
* refresh the storedBlock from blocksMap.
|
* called "immediately" so there is no need to refresh the storedBlock from
|
||||||
* Doesn't handle underReplication/overReplication, or worry about
|
* blocksMap. Doesn't handle underReplication/overReplication, or worry about
|
||||||
* pendingReplications or corruptReplicas, because it's in startup safe mode.
|
* pendingReplications or corruptReplicas, because it's in startup safe mode.
|
||||||
* Doesn't log every block, because there are typically millions of them.
|
* Doesn't log every block, because there are typically millions of them.
|
||||||
|
*
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
private void addStoredBlockImmediate(BlockInfo storedBlock,
|
private void addStoredBlockImmediate(BlockInfo storedBlock,
|
||||||
|
@ -2505,7 +2509,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
|
||||||
//
|
//
|
||||||
// Modify the blocks->datanode map and node's map.
|
// Modify the blocks->datanode map and node's map.
|
||||||
//
|
//
|
||||||
pendingReplications.remove(block);
|
pendingReplications.decrement(block);
|
||||||
processAndHandleReportedBlock(node, block, ReplicaState.FINALIZED,
|
processAndHandleReportedBlock(node, block, ReplicaState.FINALIZED,
|
||||||
delHintNode);
|
delHintNode);
|
||||||
}
|
}
|
||||||
|
@ -2641,7 +2645,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Simpler, faster form of {@link countNodes()} that only returns the number
|
* Simpler, faster form of {@link #countNodes(Block)} that only returns the number
|
||||||
* of live nodes. If in startup safemode (or its 30-sec extension period),
|
* of live nodes. If in startup safemode (or its 30-sec extension period),
|
||||||
* then it gains speed by ignoring issues of excess replicas or nodes
|
* then it gains speed by ignoring issues of excess replicas or nodes
|
||||||
* that are decommissioned or in process of becoming decommissioned.
|
* that are decommissioned or in process of becoming decommissioned.
|
||||||
|
@ -2790,6 +2794,8 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
|
||||||
addToInvalidates(block);
|
addToInvalidates(block);
|
||||||
corruptReplicas.removeFromCorruptReplicasMap(block);
|
corruptReplicas.removeFromCorruptReplicasMap(block);
|
||||||
blocksMap.removeBlock(block);
|
blocksMap.removeBlock(block);
|
||||||
|
// Remove the block from pendingReplications
|
||||||
|
pendingReplications.remove(block);
|
||||||
if (postponedMisreplicatedBlocks.remove(block)) {
|
if (postponedMisreplicatedBlocks.remove(block)) {
|
||||||
postponedMisreplicatedBlocksCount--;
|
postponedMisreplicatedBlocksCount--;
|
||||||
}
|
}
|
||||||
|
|
|
@ -72,7 +72,7 @@ class PendingReplicationBlocks {
|
||||||
/**
|
/**
|
||||||
* Add a block to the list of pending Replications
|
* Add a block to the list of pending Replications
|
||||||
*/
|
*/
|
||||||
void add(Block block, int numReplicas) {
|
void increment(Block block, int numReplicas) {
|
||||||
synchronized (pendingReplications) {
|
synchronized (pendingReplications) {
|
||||||
PendingBlockInfo found = pendingReplications.get(block);
|
PendingBlockInfo found = pendingReplications.get(block);
|
||||||
if (found == null) {
|
if (found == null) {
|
||||||
|
@ -89,7 +89,7 @@ class PendingReplicationBlocks {
|
||||||
* Decrement the number of pending replication requests
|
* Decrement the number of pending replication requests
|
||||||
* for this block.
|
* for this block.
|
||||||
*/
|
*/
|
||||||
void remove(Block block) {
|
void decrement(Block block) {
|
||||||
synchronized (pendingReplications) {
|
synchronized (pendingReplications) {
|
||||||
PendingBlockInfo found = pendingReplications.get(block);
|
PendingBlockInfo found = pendingReplications.get(block);
|
||||||
if (found != null) {
|
if (found != null) {
|
||||||
|
@ -104,6 +104,16 @@ class PendingReplicationBlocks {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Remove the record about the given block from pendingReplications.
|
||||||
|
* @param block The given block whose pending replication requests need to be
|
||||||
|
* removed
|
||||||
|
*/
|
||||||
|
void remove(Block block) {
|
||||||
|
synchronized (pendingReplications) {
|
||||||
|
pendingReplications.remove(block);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public void clear() {
|
public void clear() {
|
||||||
synchronized (pendingReplications) {
|
synchronized (pendingReplications) {
|
||||||
|
|
|
@ -20,14 +20,30 @@ package org.apache.hadoop.hdfs.server.blockmanagement;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
|
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||||
|
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||||
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This class tests the internals of PendingReplicationBlocks.java
|
* This class tests the internals of PendingReplicationBlocks.java,
|
||||||
|
* as well as how PendingReplicationBlocks acts in BlockManager
|
||||||
*/
|
*/
|
||||||
public class TestPendingReplication {
|
public class TestPendingReplication {
|
||||||
final static int TIMEOUT = 3; // 3 seconds
|
final static int TIMEOUT = 3; // 3 seconds
|
||||||
|
private static final int DFS_REPLICATION_INTERVAL = 1;
|
||||||
|
// Number of datanodes in the cluster
|
||||||
|
private static final int DATANODE_COUNT = 5;
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testPendingReplication() {
|
public void testPendingReplication() {
|
||||||
|
@ -40,7 +56,7 @@ public class TestPendingReplication {
|
||||||
//
|
//
|
||||||
for (int i = 0; i < 10; i++) {
|
for (int i = 0; i < 10; i++) {
|
||||||
Block block = new Block(i, i, 0);
|
Block block = new Block(i, i, 0);
|
||||||
pendingReplications.add(block, i);
|
pendingReplications.increment(block, i);
|
||||||
}
|
}
|
||||||
assertEquals("Size of pendingReplications ",
|
assertEquals("Size of pendingReplications ",
|
||||||
10, pendingReplications.size());
|
10, pendingReplications.size());
|
||||||
|
@ -50,15 +66,15 @@ public class TestPendingReplication {
|
||||||
// remove one item and reinsert it
|
// remove one item and reinsert it
|
||||||
//
|
//
|
||||||
Block blk = new Block(8, 8, 0);
|
Block blk = new Block(8, 8, 0);
|
||||||
pendingReplications.remove(blk); // removes one replica
|
pendingReplications.decrement(blk); // removes one replica
|
||||||
assertEquals("pendingReplications.getNumReplicas ",
|
assertEquals("pendingReplications.getNumReplicas ",
|
||||||
7, pendingReplications.getNumReplicas(blk));
|
7, pendingReplications.getNumReplicas(blk));
|
||||||
|
|
||||||
for (int i = 0; i < 7; i++) {
|
for (int i = 0; i < 7; i++) {
|
||||||
pendingReplications.remove(blk); // removes all replicas
|
pendingReplications.decrement(blk); // removes all replicas
|
||||||
}
|
}
|
||||||
assertTrue(pendingReplications.size() == 9);
|
assertTrue(pendingReplications.size() == 9);
|
||||||
pendingReplications.add(blk, 8);
|
pendingReplications.increment(blk, 8);
|
||||||
assertTrue(pendingReplications.size() == 10);
|
assertTrue(pendingReplications.size() == 10);
|
||||||
|
|
||||||
//
|
//
|
||||||
|
@ -86,7 +102,7 @@ public class TestPendingReplication {
|
||||||
|
|
||||||
for (int i = 10; i < 15; i++) {
|
for (int i = 10; i < 15; i++) {
|
||||||
Block block = new Block(i, i, 0);
|
Block block = new Block(i, i, 0);
|
||||||
pendingReplications.add(block, i);
|
pendingReplications.increment(block, i);
|
||||||
}
|
}
|
||||||
assertTrue(pendingReplications.size() == 15);
|
assertTrue(pendingReplications.size() == 15);
|
||||||
|
|
||||||
|
@ -116,4 +132,70 @@ public class TestPendingReplication {
|
||||||
}
|
}
|
||||||
pendingReplications.stop();
|
pendingReplications.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test if BlockManager can correctly remove corresponding pending records
|
||||||
|
* when a file is deleted
|
||||||
|
*
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testPendingAndInvalidate() throws Exception {
|
||||||
|
final Configuration CONF = new HdfsConfiguration();
|
||||||
|
CONF.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 1024);
|
||||||
|
CONF.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
|
||||||
|
DFS_REPLICATION_INTERVAL);
|
||||||
|
CONF.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY,
|
||||||
|
DFS_REPLICATION_INTERVAL);
|
||||||
|
MiniDFSCluster cluster = new MiniDFSCluster.Builder(CONF).numDataNodes(
|
||||||
|
DATANODE_COUNT).build();
|
||||||
|
cluster.waitActive();
|
||||||
|
|
||||||
|
FSNamesystem namesystem = cluster.getNamesystem();
|
||||||
|
BlockManager bm = namesystem.getBlockManager();
|
||||||
|
DistributedFileSystem fs = cluster.getFileSystem();
|
||||||
|
try {
|
||||||
|
// 1. create a file
|
||||||
|
Path filePath = new Path("/tmp.txt");
|
||||||
|
DFSTestUtil.createFile(fs, filePath, 1024, (short) 3, 0L);
|
||||||
|
|
||||||
|
// 2. disable the heartbeats
|
||||||
|
for (DataNode dn : cluster.getDataNodes()) {
|
||||||
|
DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
// 3. mark a couple of blocks as corrupt
|
||||||
|
LocatedBlock block = NameNodeAdapter.getBlockLocations(
|
||||||
|
cluster.getNameNode(), filePath.toString(), 0, 1).get(0);
|
||||||
|
cluster.getNamesystem().writeLock();
|
||||||
|
try {
|
||||||
|
bm.findAndMarkBlockAsCorrupt(block.getBlock(), block.getLocations()[0],
|
||||||
|
"TEST");
|
||||||
|
bm.findAndMarkBlockAsCorrupt(block.getBlock(), block.getLocations()[1],
|
||||||
|
"TEST");
|
||||||
|
} finally {
|
||||||
|
cluster.getNamesystem().writeUnlock();
|
||||||
|
}
|
||||||
|
BlockManagerTestUtil.computeAllPendingWork(bm);
|
||||||
|
BlockManagerTestUtil.updateState(bm);
|
||||||
|
assertEquals(bm.getPendingReplicationBlocksCount(), 1L);
|
||||||
|
assertEquals(bm.pendingReplications.getNumReplicas(block.getBlock()
|
||||||
|
.getLocalBlock()), 2);
|
||||||
|
|
||||||
|
// 4. delete the file
|
||||||
|
fs.delete(filePath, true);
|
||||||
|
// retry at most 10 times, each time sleep for 1s. Note that 10s is much
|
||||||
|
// less than the default pending record timeout (5~10min)
|
||||||
|
int retries = 10;
|
||||||
|
long pendingNum = bm.getPendingReplicationBlocksCount();
|
||||||
|
while (pendingNum != 0 && retries-- > 0) {
|
||||||
|
Thread.sleep(1000); // let NN do the deletion
|
||||||
|
BlockManagerTestUtil.updateState(bm);
|
||||||
|
pendingNum = bm.getPendingReplicationBlocksCount();
|
||||||
|
}
|
||||||
|
assertEquals(pendingNum, 0L);
|
||||||
|
} finally {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue