HBASE-22686 ZkSplitLogWorkerCoordination doesn't allow a regionserver to pick up all of the split work it is capable of

This commit is contained in:
Andrew Purtell 2019-07-12 12:26:00 -07:00
parent a3dc5bcde1
commit 4d0378629b
No known key found for this signature in database
GPG Key ID: 8597754DD5365CCD
2 changed files with 9 additions and 77 deletions

View File

@ -328,34 +328,10 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements
}
/**
* This function calculates how many splitters this RS should create based on expected average
* tasks per RS and the hard limit upper bound(maxConcurrentTasks) set by configuration. <br>
* At any given time, a RS allows spawn MIN(Expected Tasks/RS, Hard Upper Bound)
* @param numTasks total number of split tasks available
* @return number of tasks this RS can grab
*/
private int getNumExpectedTasksPerRS(int numTasks) {
// at lease one RS(itself) available
int availableRSs = 1;
try {
List<String> regionServers =
ZKUtil.listChildrenNoWatch(watcher, watcher.rsZNode);
availableRSs = Math.max(availableRSs, (regionServers == null) ? 0 : regionServers.size());
} catch (KeeperException e) {
// do nothing
LOG.debug("getAvailableRegionServers got ZooKeeper exception", e);
}
int expectedTasksPerRS = (numTasks / availableRSs) + ((numTasks % availableRSs == 0) ? 0 : 1);
return Math.max(1, expectedTasksPerRS); // at least be one
}
/**
* @param expectedTasksPerRS Average number of tasks to be handled by each RS
* @return true if more splitters are available, otherwise false.
*/
private boolean areSplittersAvailable(int expectedTasksPerRS) {
return (Math.min(expectedTasksPerRS, maxConcurrentTasks)
- this.tasksInProgress.get()) > 0;
private boolean areSplittersAvailable() {
return maxConcurrentTasks - tasksInProgress.get() > 0;
}
/**
@ -436,13 +412,14 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements
}
}
int numTasks = paths.size();
int expectedTasksPerRS = getNumExpectedTasksPerRS(numTasks);
boolean taskGrabbed = false;
for (int i = 0; i < numTasks; i++) {
while (!shouldStop) {
if (this.areSplittersAvailable(expectedTasksPerRS)) {
LOG.debug("Current region server " + server.getServerName()
if (this.areSplittersAvailable()) {
if (LOG.isTraceEnabled()) {
LOG.trace("Current region server " + server.getServerName()
+ " is ready to take more tasks, will get task list and try grab tasks again.");
}
int idx = (i + offset) % paths.size();
// don't call ZKSplitLog.getNodeName() because that will lead to
// double encoding of the path name
@ -450,9 +427,11 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements
paths.get(idx)));
break;
} else {
LOG.debug("Current region server " + server.getServerName()
if (LOG.isTraceEnabled()) {
LOG.trace("Current region server " + server.getServerName()
+ " has " + this.tasksInProgress.get()
+ " tasks in progress and can't take more.");
}
Thread.sleep(100);
}
}

View File

@ -455,53 +455,6 @@ public class TestSplitLogWorker {
}
}
/**
* The test checks SplitLogWorker should not spawn more splitters than expected num of tasks per
* RS
* @throws Exception
*/
@Test(timeout=60000)
public void testAcquireMultiTasksByAvgTasksPerRS() throws Exception {
LOG.info("testAcquireMultiTasks");
SplitLogCounters.resetCounters();
final String TATAS = "tatas";
final ServerName RS = ServerName.valueOf("rs,1,1");
final ServerName RS2 = ServerName.valueOf("rs,1,2");
final int maxTasks = 3;
Configuration testConf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
testConf.setInt("hbase.regionserver.wal.max.splitters", maxTasks);
RegionServerServices mockedRS = getRegionServer(RS);
// create two RS nodes
String rsPath = ZKUtil.joinZNode(zkw.rsZNode, RS.getServerName());
zkw.getRecoverableZooKeeper().create(rsPath, null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
rsPath = ZKUtil.joinZNode(zkw.rsZNode, RS2.getServerName());
zkw.getRecoverableZooKeeper().create(rsPath, null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
for (int i = 0; i < maxTasks; i++) {
zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TATAS + i),
new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1"), this.mode).toByteArray(),
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
SplitLogWorker slw = new SplitLogWorker(ds, testConf, mockedRS, neverEndingTask);
slw.start();
try {
int acquiredTasks = 0;
waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 2, WAIT_TIME);
for (int i = 0; i < maxTasks; i++) {
byte[] bytes = ZKUtil.getData(zkw, ZKSplitLog.getEncodedNodeName(zkw, TATAS + i));
SplitLogTask slt = SplitLogTask.parseFrom(bytes);
if (slt.isOwned(RS)) {
acquiredTasks++;
}
}
assertEquals(2, acquiredTasks);
} finally {
stopSplitLogWorker(slw);
}
}
/**
* Create a mocked region server service instance
* @param server