svn merge -c 1532932 from trunk for HDFS-4376. Fix race conditions in Balancer.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1532933 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2013-10-16 23:06:46 +00:00
parent 186581fef2
commit 0b18a51c33
2 changed files with 29 additions and 11 deletions

View File

@ -108,6 +108,8 @@ Release 2.3.0 - UNRELEASED
HDFS-5283. Under construction blocks only inside snapshots should not be HDFS-5283. Under construction blocks only inside snapshots should not be
counted in safemode threshhold. (Vinay via szetszwo) counted in safemode threshhold. (Vinay via szetszwo)
HDFS-4376. Fix race conditions in Balancer. (Junping Du via szetszwo)
Release 2.2.1 - UNRELEASED Release 2.2.1 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -506,7 +506,7 @@ public class Balancer {
final DatanodeInfo datanode; final DatanodeInfo datanode;
final double utilization; final double utilization;
final long maxSize2Move; final long maxSize2Move;
protected long scheduledSize = 0L; private long scheduledSize = 0L;
// blocks being moved but not confirmed yet // blocks being moved but not confirmed yet
private List<PendingBlockMove> pendingBlocks = private List<PendingBlockMove> pendingBlocks =
new ArrayList<PendingBlockMove>(MAX_NUM_CONCURRENT_MOVES); new ArrayList<PendingBlockMove>(MAX_NUM_CONCURRENT_MOVES);
@ -555,20 +555,35 @@ public class Balancer {
} }
/** Decide if still need to move more bytes */ /** Decide if still need to move more bytes */
protected boolean hasSpaceForScheduling() { protected synchronized boolean hasSpaceForScheduling() {
return scheduledSize<maxSize2Move; return scheduledSize<maxSize2Move;
} }
/** Return the total number of bytes that need to be moved */ /** Return the total number of bytes that need to be moved */
protected long availableSizeToMove() { protected synchronized long availableSizeToMove() {
return maxSize2Move-scheduledSize; return maxSize2Move-scheduledSize;
} }
/* increment scheduled size */ /** increment scheduled size */
protected void incScheduledSize(long size) { protected synchronized void incScheduledSize(long size) {
scheduledSize += size; scheduledSize += size;
} }
/** decrement scheduled size */
protected synchronized void decScheduledSize(long size) {
scheduledSize -= size;
}
/** get scheduled size */
protected synchronized long getScheduledSize(){
return scheduledSize;
}
/** get scheduled size */
protected synchronized void setScheduledSize(long size){
scheduledSize = size;
}
/* Check if the node can schedule more blocks to move */ /* Check if the node can schedule more blocks to move */
synchronized private boolean isPendingQNotFull() { synchronized private boolean isPendingQNotFull() {
if ( pendingBlocks.size() < MAX_NUM_CONCURRENT_MOVES ) { if ( pendingBlocks.size() < MAX_NUM_CONCURRENT_MOVES ) {
@ -702,8 +717,8 @@ public class Balancer {
pendingBlock.source = this; pendingBlock.source = this;
pendingBlock.target = target; pendingBlock.target = target;
if ( pendingBlock.chooseBlockAndProxy() ) { if ( pendingBlock.chooseBlockAndProxy() ) {
long blockSize = pendingBlock.block.getNumBytes(); long blockSize = pendingBlock.block.getNumBytes();
scheduledSize -= blockSize; decScheduledSize(blockSize);
task.size -= blockSize; task.size -= blockSize;
if (task.size == 0) { if (task.size == 0) {
tasks.remove(); tasks.remove();
@ -747,10 +762,11 @@ public class Balancer {
private static final long MAX_ITERATION_TIME = 20*60*1000L; //20 mins private static final long MAX_ITERATION_TIME = 20*60*1000L; //20 mins
private void dispatchBlocks() { private void dispatchBlocks() {
long startTime = Time.now(); long startTime = Time.now();
long scheduledSize = getScheduledSize();
this.blocksToReceive = 2*scheduledSize; this.blocksToReceive = 2*scheduledSize;
boolean isTimeUp = false; boolean isTimeUp = false;
int noPendingBlockIteration = 0; int noPendingBlockIteration = 0;
while(!isTimeUp && scheduledSize>0 && while(!isTimeUp && getScheduledSize()>0 &&
(!srcBlockList.isEmpty() || blocksToReceive>0)) { (!srcBlockList.isEmpty() || blocksToReceive>0)) {
PendingBlockMove pendingBlock = chooseNextBlockToMove(); PendingBlockMove pendingBlock = chooseNextBlockToMove();
if (pendingBlock != null) { if (pendingBlock != null) {
@ -779,7 +795,7 @@ public class Balancer {
// in case no blocks can be moved for source node's task, // in case no blocks can be moved for source node's task,
// jump out of while-loop after 5 iterations. // jump out of while-loop after 5 iterations.
if (noPendingBlockIteration >= MAX_NO_PENDING_BLOCK_ITERATIONS) { if (noPendingBlockIteration >= MAX_NO_PENDING_BLOCK_ITERATIONS) {
scheduledSize = 0; setScheduledSize(0);
} }
} }
@ -992,7 +1008,7 @@ public class Balancer {
long bytesToMove = 0L; long bytesToMove = 0L;
for (Source src : sources) { for (Source src : sources) {
bytesToMove += src.scheduledSize; bytesToMove += src.getScheduledSize();
} }
return bytesToMove; return bytesToMove;
} }
@ -1093,7 +1109,7 @@ public class Balancer {
bytesMoved += bytes; bytesMoved += bytes;
} }
private long get() { private synchronized long get() {
return bytesMoved; return bytesMoved;
} }
}; };