From 051ac2d9fa4b3b2980ea3349209a97610d1437f8 Mon Sep 17 00:00:00 2001 From: Suresh Srinivas Date: Wed, 7 Mar 2012 06:38:53 +0000 Subject: [PATCH] HDFS-2495. Merging change r1199024 from trunk to 0.23 git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1297862 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 7 +- .../server/blockmanagement/BlockManager.java | 311 ++++++++++-------- .../blockmanagement/TestBlockManager.java | 33 +- 3 files changed, 207 insertions(+), 144 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index caaa1293006..7a266565fe4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -102,14 +102,13 @@ Release 0.23.3 - UNRELEASED HDFS-2334. Add Closeable to JournalManager. (Ivan Kelly via jitendra) -<<<<<<< .working OPTIMIZATIONS - -======= HDFS-2477. Optimize computing the diff between a block report and the namenode state. (Tomasz Nykiel via hairong) ->>>>>>> .merge-right.r1196676 + HDFS-2495. Increase granularity of write operations in ReplicationMonitor + thus reducing contention for write lock. (Tomasz Nykiel via hairong) + BUG FIXES HDFS-2481. Unknown protocol: org.apache.hadoop.hdfs.protocol.ClientProtocol. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index aac7a059d04..f2b6cdabb3b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -940,15 +940,7 @@ public class BlockManager { chooseUnderReplicatedBlocks(blocksToProcess); // replicate blocks - int scheduledReplicationCount = 0; - for (int i=0; i> blocksToReplicate) { int requiredReplication, numEffectiveReplicas; List containingNodes, liveReplicaNodes; DatanodeDescriptor srcNode; INodeFile fileINode = null; int additionalReplRequired; + int scheduledWork = 0; + List work = new LinkedList(); + namesystem.writeLock(); try { synchronized (neededReplications) { - // block should belong to a file - fileINode = blocksMap.getINode(block); - // abandoned block or block reopened for append - if(fileINode == null || fileINode.isUnderConstruction()) { - neededReplications.remove(block, priority); // remove from neededReplications - replIndex--; - return false; - } + for (int priority = 0; priority < blocksToReplicate.size(); priority++) { + for (Block block : blocksToReplicate.get(priority)) { + // block should belong to a file + fileINode = blocksMap.getINode(block); + // abandoned block or block reopened for append + if(fileINode == null || fileINode.isUnderConstruction()) { + neededReplications.remove(block, priority); // remove from neededReplications + replIndex--; + continue; + } - requiredReplication = fileINode.getReplication(); + requiredReplication = fileINode.getReplication(); - // get a source data-node - containingNodes = new ArrayList(); - liveReplicaNodes = new ArrayList(); - NumberReplicas numReplicas = new NumberReplicas(); - srcNode = chooseSourceDatanode( - block, containingNodes, liveReplicaNodes, numReplicas); - if(srcNode == null) // block can not be replicated from any node - return false; + // get a source data-node + containingNodes = new ArrayList(); + liveReplicaNodes = new ArrayList(); + NumberReplicas numReplicas = new NumberReplicas(); + srcNode = chooseSourceDatanode( + block, containingNodes, liveReplicaNodes, numReplicas); + if(srcNode == null) // block can not be replicated from any node + continue; - assert liveReplicaNodes.size() == numReplicas.liveReplicas(); - // do not schedule more if enough replicas is already pending - numEffectiveReplicas = numReplicas.liveReplicas() + - pendingReplications.getNumReplicas(block); + assert liveReplicaNodes.size() == numReplicas.liveReplicas(); + // do not schedule more if enough replicas is already pending + numEffectiveReplicas = numReplicas.liveReplicas() + + pendingReplications.getNumReplicas(block); - if (numEffectiveReplicas >= requiredReplication) { - if ( (pendingReplications.getNumReplicas(block) > 0) || - (blockHasEnoughRacks(block)) ) { - neededReplications.remove(block, priority); // remove from neededReplications - replIndex--; - NameNode.stateChangeLog.info("BLOCK* " - + "Removing block " + block - + " from neededReplications as it has enough replicas."); - return false; + if (numEffectiveReplicas >= requiredReplication) { + if ( (pendingReplications.getNumReplicas(block) > 0) || + (blockHasEnoughRacks(block)) ) { + neededReplications.remove(block, priority); // remove from neededReplications + replIndex--; + NameNode.stateChangeLog.info("BLOCK* " + + "Removing block " + block + + " from neededReplications as it has enough replicas."); + continue; + } + } + + if (numReplicas.liveReplicas() < requiredReplication) { + additionalReplRequired = requiredReplication + - numEffectiveReplicas; + } else { + additionalReplRequired = 1; // Needed on a new rack + } + work.add(new ReplicationWork(block, fileINode, srcNode, + containingNodes, liveReplicaNodes, additionalReplRequired, + priority)); } } - - if (numReplicas.liveReplicas() < requiredReplication) { - additionalReplRequired = requiredReplication - numEffectiveReplicas; - } else { - additionalReplRequired = 1; //Needed on a new rack - } - } } finally { namesystem.writeUnlock(); } - - // Exclude all of the containing nodes from being targets. - // This list includes decommissioning or corrupt nodes. - HashMap excludedNodes = new HashMap(); - for (DatanodeDescriptor dn : containingNodes) { - excludedNodes.put(dn, dn); - } - // choose replication targets: NOT HOLDING THE GLOBAL LOCK - // It is costly to extract the filename for which chooseTargets is called, - // so for now we pass in the Inode itself. - DatanodeDescriptor targets[] = - blockplacement.chooseTarget(fileINode, additionalReplRequired, - srcNode, liveReplicaNodes, excludedNodes, block.getNumBytes()); - if(targets.length == 0) - return false; + HashMap excludedNodes + = new HashMap(); + for(ReplicationWork rw : work){ + // Exclude all of the containing nodes from being targets. + // This list includes decommissioning or corrupt nodes. + excludedNodes.clear(); + for (DatanodeDescriptor dn : rw.containingNodes) { + excludedNodes.put(dn, dn); + } + + // choose replication targets: NOT HOLDING THE GLOBAL LOCK + // It is costly to extract the filename for which chooseTargets is called, + // so for now we pass in the Inode itself. + rw.targets = blockplacement.chooseTarget(rw.fileINode, + rw.additionalReplRequired, rw.srcNode, rw.liveReplicaNodes, + excludedNodes, rw.block.getNumBytes()); + } namesystem.writeLock(); try { - synchronized (neededReplications) { - // Recheck since global lock was released - // block should belong to a file - fileINode = blocksMap.getINode(block); - // abandoned block or block reopened for append - if(fileINode == null || fileINode.isUnderConstruction()) { - neededReplications.remove(block, priority); // remove from neededReplications - replIndex--; - return false; + for(ReplicationWork rw : work){ + DatanodeDescriptor[] targets = rw.targets; + if(targets == null || targets.length == 0){ + rw.targets = null; + continue; } - requiredReplication = fileINode.getReplication(); - // do not schedule more if enough replicas is already pending - NumberReplicas numReplicas = countNodes(block); - numEffectiveReplicas = numReplicas.liveReplicas() + - pendingReplications.getNumReplicas(block); + synchronized (neededReplications) { + Block block = rw.block; + int priority = rw.priority; + // Recheck since global lock was released + // block should belong to a file + fileINode = blocksMap.getINode(block); + // abandoned block or block reopened for append + if(fileINode == null || fileINode.isUnderConstruction()) { + neededReplications.remove(block, priority); // remove from neededReplications + rw.targets = null; + replIndex--; + continue; + } + requiredReplication = fileINode.getReplication(); - if (numEffectiveReplicas >= requiredReplication) { - if ( (pendingReplications.getNumReplicas(block) > 0) || - (blockHasEnoughRacks(block)) ) { + // do not schedule more if enough replicas is already pending + NumberReplicas numReplicas = countNodes(block); + numEffectiveReplicas = numReplicas.liveReplicas() + + pendingReplications.getNumReplicas(block); + + if (numEffectiveReplicas >= requiredReplication) { + if ( (pendingReplications.getNumReplicas(block) > 0) || + (blockHasEnoughRacks(block)) ) { + neededReplications.remove(block, priority); // remove from neededReplications + replIndex--; + rw.targets = null; + NameNode.stateChangeLog.info("BLOCK* " + + "Removing block " + block + + " from neededReplications as it has enough replicas."); + continue; + } + } + + if ( (numReplicas.liveReplicas() >= requiredReplication) && + (!blockHasEnoughRacks(block)) ) { + if (rw.srcNode.getNetworkLocation().equals(targets[0].getNetworkLocation())) { + //No use continuing, unless a new rack in this case + continue; + } + } + + // Add block to the to be replicated list + rw.srcNode.addBlockToBeReplicated(block, targets); + scheduledWork++; + + for (DatanodeDescriptor dn : targets) { + dn.incBlocksScheduled(); + } + + // Move the block-replication into a "pending" state. + // The reason we use 'pending' is so we can retry + // replications that fail after an appropriate amount of time. + pendingReplications.add(block, targets.length); + if(NameNode.stateChangeLog.isDebugEnabled()) { + NameNode.stateChangeLog.debug( + "BLOCK* block " + block + + " is moved from neededReplications to pendingReplications"); + } + + // remove from neededReplications + if(numEffectiveReplicas + targets.length >= requiredReplication) { neededReplications.remove(block, priority); // remove from neededReplications replIndex--; - NameNode.stateChangeLog.info("BLOCK* " - + "Removing block " + block - + " from neededReplications as it has enough replicas."); - return false; } } + } + } finally { + namesystem.writeUnlock(); + } - if ( (numReplicas.liveReplicas() >= requiredReplication) && - (!blockHasEnoughRacks(block)) ) { - if (srcNode.getNetworkLocation().equals(targets[0].getNetworkLocation())) { - //No use continuing, unless a new rack in this case - return false; - } - } - - // Add block to the to be replicated list - srcNode.addBlockToBeReplicated(block, targets); - - for (DatanodeDescriptor dn : targets) { - dn.incBlocksScheduled(); - } - - // Move the block-replication into a "pending" state. - // The reason we use 'pending' is so we can retry - // replications that fail after an appropriate amount of time. - pendingReplications.add(block, targets.length); - if(NameNode.stateChangeLog.isDebugEnabled()) { - NameNode.stateChangeLog.debug( - "BLOCK* block " + block - + " is moved from neededReplications to pendingReplications"); - } - - // remove from neededReplications - if(numEffectiveReplicas + targets.length >= requiredReplication) { - neededReplications.remove(block, priority); // remove from neededReplications - replIndex--; - } - if (NameNode.stateChangeLog.isInfoEnabled()) { + if (NameNode.stateChangeLog.isInfoEnabled()) { + // log which blocks have been scheduled for replication + for(ReplicationWork rw : work){ + DatanodeDescriptor[] targets = rw.targets; + if (targets != null && targets.length != 0) { StringBuilder targetList = new StringBuilder("datanode(s)"); for (int k = 0; k < targets.length; k++) { targetList.append(' '); targetList.append(targets[k].getName()); } NameNode.stateChangeLog.info( - "BLOCK* ask " - + srcNode.getName() + " to replicate " - + block + " to " + targetList); - if(NameNode.stateChangeLog.isDebugEnabled()) { - NameNode.stateChangeLog.debug( - "BLOCK* neededReplications = " + neededReplications.size() - + " pendingReplications = " + pendingReplications.size()); - } + "BLOCK* ask " + + rw.srcNode.getName() + " to replicate " + + rw.block + " to " + targetList); } } - } finally { - namesystem.writeUnlock(); + } + if(NameNode.stateChangeLog.isDebugEnabled()) { + NameNode.stateChangeLog.debug( + "BLOCK* neededReplications = " + neededReplications.size() + + " pendingReplications = " + pendingReplications.size()); } - return true; + return scheduledWork; } /** @@ -2684,4 +2707,34 @@ public class BlockManager { return workFound; } + private static class ReplicationWork { + + private Block block; + private INodeFile fileINode; + + private DatanodeDescriptor srcNode; + private List containingNodes; + private List liveReplicaNodes; + private int additionalReplRequired; + + private DatanodeDescriptor targets[]; + private int priority; + + public ReplicationWork(Block block, + INodeFile fileINode, + DatanodeDescriptor srcNode, + List containingNodes, + List liveReplicaNodes, + int additionalReplRequired, + int priority) { + this.block = block; + this.fileINode = fileINode; + this.srcNode = srcNode; + this.containingNodes = containingNodes; + this.liveReplicaNodes = liveReplicaNodes; + this.additionalReplRequired = additionalReplRequired; + this.priority = priority; + this.targets = null; + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java index 44d733df5ae..dbecfe7f783 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.blockmanagement; import static org.junit.Assert.*; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.Map.Entry; @@ -355,25 +356,35 @@ public class TestBlockManager { bm.blocksMap.addINode(blockInfo, iNode); return blockInfo; } - + private DatanodeDescriptor[] scheduleSingleReplication(Block block) { - assertEquals("Block not initially pending replication", - 0, bm.pendingReplications.getNumReplicas(block)); - assertTrue("computeReplicationWork should indicate replication is needed", - bm.computeReplicationWorkForBlock(block, 1)); + // list for priority 1 + List list_p1 = new ArrayList(); + list_p1.add(block); + + // list of lists for each priority + List> list_all = new ArrayList>(); + list_all.add(new ArrayList()); // for priority 0 + list_all.add(list_p1); // for priority 1 + + assertEquals("Block not initially pending replication", 0, + bm.pendingReplications.getNumReplicas(block)); + assertEquals( + "computeReplicationWork should indicate replication is needed", 1, + bm.computeReplicationWorkForBlocks(list_all)); assertTrue("replication is pending after work is computed", bm.pendingReplications.getNumReplicas(block) > 0); - - LinkedListMultimap repls = - getAllPendingReplications(); + + LinkedListMultimap repls = getAllPendingReplications(); assertEquals(1, repls.size()); - Entry repl = repls.entries().iterator().next(); + Entry repl = repls.entries() + .iterator().next(); DatanodeDescriptor[] targets = repl.getValue().targets; - + DatanodeDescriptor[] pipeline = new DatanodeDescriptor[1 + targets.length]; pipeline[0] = repl.getKey(); System.arraycopy(targets, 0, pipeline, 1, targets.length); - + return pipeline; }