HDFS-4261. Fix bugs in Balaner causing infinite loop and TestBalancerWithNodeGroup timeing out. Contributed by Junping Du

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1430917 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2013-01-09 15:55:49 +00:00
parent 5f7d4d2b45
commit eae2a30462
4 changed files with 99 additions and 19 deletions

View File

@ -294,6 +294,9 @@ Trunk (Unreleased)
HDFS-4338. TestNameNodeMetrics#testCorruptBlock is flaky. (Andrew Wang via HDFS-4338. TestNameNodeMetrics#testCorruptBlock is flaky. (Andrew Wang via
atm) atm)
HDFS-4261. Fix bugs in Balaner causing infinite loop and
TestBalancerWithNodeGroup timeing out. (Junping Du via szetszwo)
Release 2.0.3-alpha - Unreleased Release 2.0.3-alpha - Unreleased
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -189,6 +189,7 @@ public class Balancer {
* balancing purpose at a datanode * balancing purpose at a datanode
*/ */
public static final int MAX_NUM_CONCURRENT_MOVES = 5; public static final int MAX_NUM_CONCURRENT_MOVES = 5;
public static final int MAX_NO_PENDING_BLOCK_INTERATIONS = 5;
private static final String USAGE = "Usage: java " private static final String USAGE = "Usage: java "
+ Balancer.class.getSimpleName() + Balancer.class.getSimpleName()
@ -224,7 +225,6 @@ public class Balancer {
= new HashMap<String, BalancerDatanode>(); = new HashMap<String, BalancerDatanode>();
private NetworkTopology cluster; private NetworkTopology cluster;
final static private int MOVER_THREAD_POOL_SIZE = 1000; final static private int MOVER_THREAD_POOL_SIZE = 1000;
final private ExecutorService moverExecutor = final private ExecutorService moverExecutor =
Executors.newFixedThreadPool(MOVER_THREAD_POOL_SIZE); Executors.newFixedThreadPool(MOVER_THREAD_POOL_SIZE);
@ -752,6 +752,7 @@ public class Balancer {
long startTime = Time.now(); long startTime = Time.now();
this.blocksToReceive = 2*scheduledSize; this.blocksToReceive = 2*scheduledSize;
boolean isTimeUp = false; boolean isTimeUp = false;
int noPendingBlockIteration = 0;
while(!isTimeUp && scheduledSize>0 && while(!isTimeUp && scheduledSize>0 &&
(!srcBlockList.isEmpty() || blocksToReceive>0)) { (!srcBlockList.isEmpty() || blocksToReceive>0)) {
PendingBlockMove pendingBlock = chooseNextBlockToMove(); PendingBlockMove pendingBlock = chooseNextBlockToMove();
@ -775,6 +776,14 @@ public class Balancer {
LOG.warn("Exception while getting block list", e); LOG.warn("Exception while getting block list", e);
return; return;
} }
} else {
// source node cannot find a pendingBlockToMove, iteration +1
noPendingBlockIteration++;
// in case no blocks can be moved for source node's task,
// jump out of while-loop after 5 iterations.
if (noPendingBlockIteration >= MAX_NO_PENDING_BLOCK_INTERATIONS) {
scheduledSize = 0;
}
} }
// check if time is up or not // check if time is up or not
@ -801,8 +810,8 @@ public class Balancer {
*/ */
private static void checkReplicationPolicyCompatibility(Configuration conf private static void checkReplicationPolicyCompatibility(Configuration conf
) throws UnsupportedActionException { ) throws UnsupportedActionException {
if (BlockPlacementPolicy.getInstance(conf, null, null) instanceof if (!(BlockPlacementPolicy.getInstance(conf, null, null) instanceof
BlockPlacementPolicyDefault) { BlockPlacementPolicyDefault)) {
throw new UnsupportedActionException( throw new UnsupportedActionException(
"Balancer without BlockPlacementPolicyDefault"); "Balancer without BlockPlacementPolicyDefault");
} }
@ -1085,7 +1094,6 @@ public class Balancer {
} }
}; };
private BytesMoved bytesMoved = new BytesMoved(); private BytesMoved bytesMoved = new BytesMoved();
private int notChangedIterations = 0;
/* Start a thread to dispatch block moves for each source. /* Start a thread to dispatch block moves for each source.
* The thread selects blocks to move & sends request to proxy source to * The thread selects blocks to move & sends request to proxy source to
@ -1384,19 +1392,10 @@ public class Balancer {
* available to move. * available to move.
* Exit no byte has been moved for 5 consecutive iterations. * Exit no byte has been moved for 5 consecutive iterations.
*/ */
if (dispatchBlockMoves() > 0) { if (!this.nnc.shouldContinue(dispatchBlockMoves())) {
notChangedIterations = 0;
} else {
notChangedIterations++;
if (notChangedIterations >= 5) {
System.out.println(
"No block has been moved for 5 iterations. Exiting...");
return ReturnStatus.NO_MOVE_PROGRESS; return ReturnStatus.NO_MOVE_PROGRESS;
} }
}
// clean all lists
resetData(conf);
return ReturnStatus.IN_PROGRESS; return ReturnStatus.IN_PROGRESS;
} catch (IllegalArgumentException e) { } catch (IllegalArgumentException e) {
System.out.println(e + ". Exiting ..."); System.out.println(e + ". Exiting ...");
@ -1445,6 +1444,8 @@ public class Balancer {
for(NameNodeConnector nnc : connectors) { for(NameNodeConnector nnc : connectors) {
final Balancer b = new Balancer(nnc, p, conf); final Balancer b = new Balancer(nnc, p, conf);
final ReturnStatus r = b.run(iteration, formatter, conf); final ReturnStatus r = b.run(iteration, formatter, conf);
// clean all lists
b.resetData(conf);
if (r == ReturnStatus.IN_PROGRESS) { if (r == ReturnStatus.IN_PROGRESS) {
done = false; done = false;
} else if (r != ReturnStatus.SUCCESS) { } else if (r != ReturnStatus.SUCCESS) {

View File

@ -52,6 +52,7 @@ import org.apache.hadoop.util.Daemon;
class NameNodeConnector { class NameNodeConnector {
private static final Log LOG = Balancer.LOG; private static final Log LOG = Balancer.LOG;
private static final Path BALANCER_ID_PATH = new Path("/system/balancer.id"); private static final Path BALANCER_ID_PATH = new Path("/system/balancer.id");
private static final int MAX_NOT_CHANGED_INTERATIONS = 5;
final URI nameNodeUri; final URI nameNodeUri;
final String blockpoolID; final String blockpoolID;
@ -65,6 +66,8 @@ class NameNodeConnector {
private final boolean encryptDataTransfer; private final boolean encryptDataTransfer;
private boolean shouldRun; private boolean shouldRun;
private long keyUpdaterInterval; private long keyUpdaterInterval;
// used for balancer
private int notChangedIterations = 0;
private BlockTokenSecretManager blockTokenSecretManager; private BlockTokenSecretManager blockTokenSecretManager;
private Daemon keyupdaterthread; // AccessKeyUpdater thread private Daemon keyupdaterthread; // AccessKeyUpdater thread
private DataEncryptionKey encryptionKey; private DataEncryptionKey encryptionKey;
@ -119,6 +122,20 @@ class NameNodeConnector {
} }
} }
boolean shouldContinue(long dispatchBlockMoveBytes) {
if (dispatchBlockMoveBytes > 0) {
notChangedIterations = 0;
} else {
notChangedIterations++;
if (notChangedIterations >= MAX_NOT_CHANGED_INTERATIONS) {
System.out.println("No block has been moved for "
+ notChangedIterations + " iterations. Exiting...");
return false;
}
}
return true;
}
/** Get an access token for a block. */ /** Get an access token for a block. */
Token<BlockTokenIdentifier> getAccessToken(ExtendedBlock eb Token<BlockTokenIdentifier> getAccessToken(ExtendedBlock eb
) throws IOException { ) throws IOException {

View File

@ -41,6 +41,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.net.NetworkTopology;
import org.junit.Test; import org.junit.Test;
import junit.framework.Assert;
/** /**
* This class tests if a balancer schedules tasks correctly. * This class tests if a balancer schedules tasks correctly.
@ -175,11 +176,24 @@ public class TestBalancerWithNodeGroup {
waitForBalancer(totalUsedSpace, totalCapacity); waitForBalancer(totalUsedSpace, totalCapacity);
} }
private void runBalancerCanFinish(Configuration conf,
long totalUsedSpace, long totalCapacity) throws Exception {
waitForHeartBeat(totalUsedSpace, totalCapacity);
// start rebalancing
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
final int r = Balancer.run(namenodes, Balancer.Parameters.DEFALUT, conf);
Assert.assertTrue(r == Balancer.ReturnStatus.SUCCESS.code ||
(r == Balancer.ReturnStatus.NO_MOVE_PROGRESS.code));
waitForHeartBeat(totalUsedSpace, totalCapacity);
LOG.info("Rebalancing with default factor.");
}
/** /**
* Create a cluster with even distribution, and a new empty node is added to * Create a cluster with even distribution, and a new empty node is added to
* the cluster, then test rack locality for balancer policy. * the cluster, then test rack locality for balancer policy.
*/ */
@Test @Test(timeout=60000)
public void testBalancerWithRackLocality() throws Exception { public void testBalancerWithRackLocality() throws Exception {
Configuration conf = createConf(); Configuration conf = createConf();
long[] capacities = new long[]{CAPACITY, CAPACITY}; long[] capacities = new long[]{CAPACITY, CAPACITY};
@ -217,7 +231,7 @@ public class TestBalancerWithNodeGroup {
totalCapacity += newCapacity; totalCapacity += newCapacity;
// run balancer and validate results // run balancer and validate results
runBalancer(conf, totalUsedSpace, totalCapacity); runBalancerCanFinish(conf, totalUsedSpace, totalCapacity);
DatanodeInfo[] datanodeReport = DatanodeInfo[] datanodeReport =
client.getDatanodeReport(DatanodeReportType.ALL); client.getDatanodeReport(DatanodeReportType.ALL);
@ -245,7 +259,7 @@ public class TestBalancerWithNodeGroup {
* Create a cluster with even distribution, and a new empty node is added to * Create a cluster with even distribution, and a new empty node is added to
* the cluster, then test node-group locality for balancer policy. * the cluster, then test node-group locality for balancer policy.
*/ */
@Test @Test(timeout=60000)
public void testBalancerWithNodeGroup() throws Exception { public void testBalancerWithNodeGroup() throws Exception {
Configuration conf = createConf(); Configuration conf = createConf();
long[] capacities = new long[]{CAPACITY, CAPACITY, CAPACITY, CAPACITY}; long[] capacities = new long[]{CAPACITY, CAPACITY, CAPACITY, CAPACITY};
@ -289,4 +303,49 @@ public class TestBalancerWithNodeGroup {
cluster.shutdown(); cluster.shutdown();
} }
} }
/**
* Create a 4 nodes cluster: 2 nodes (n0, n1) in RACK0/NODEGROUP0, 1 node (n2)
* in RACK1/NODEGROUP1 and 1 node (n3) in RACK1/NODEGROUP2. Fill the cluster
* to 60% and 3 replicas, so n2 and n3 will have replica for all blocks according
* to replica placement policy with NodeGroup. As a result, n2 and n3 will be
* filled with 80% (60% x 4 / 3), and no blocks can be migrated from n2 and n3
* to n0 or n1 as balancer policy with node group. Thus, we expect the balancer
* to end in 5 iterations without move block process.
*/
@Test(timeout=60000)
public void testBalancerEndInNoMoveProgress() throws Exception {
Configuration conf = createConf();
long[] capacities = new long[]{CAPACITY, CAPACITY, CAPACITY, CAPACITY};
String[] racks = new String[]{RACK0, RACK0, RACK1, RACK1};
String[] nodeGroups = new String[]{NODEGROUP0, NODEGROUP0, NODEGROUP1, NODEGROUP2};
int numOfDatanodes = capacities.length;
assertEquals(numOfDatanodes, racks.length);
assertEquals(numOfDatanodes, nodeGroups.length);
MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf)
.numDataNodes(capacities.length)
.racks(racks)
.simulatedCapacities(capacities);
MiniDFSClusterWithNodeGroup.setNodeGroups(nodeGroups);
cluster = new MiniDFSClusterWithNodeGroup(builder);
try {
cluster.waitActive();
client = NameNodeProxies.createProxy(conf,
cluster.getFileSystem(0).getUri(),
ClientProtocol.class).getProxy();
long totalCapacity = TestBalancer.sum(capacities);
// fill up the cluster to be 60% full
long totalUsedSpace = totalCapacity * 6 / 10;
TestBalancer.createFile(cluster, filePath, totalUsedSpace / 3,
(short) (3), 0);
// run balancer which can finish in 5 iterations with no block movement.
runBalancerCanFinish(conf, totalUsedSpace, totalCapacity);
} finally {
cluster.shutdown();
}
}
} }