HBASE-22686 ZkSplitLogWorkerCoordination doesn't allow a regionserver to pick up all of the split work it is capable of (#377)
Signed-off-by: Xu Cang <xcang@apache.org>
This commit is contained in:
parent
25f9070bc5
commit
19d8e33798
|
@ -324,34 +324,10 @@ public class ZkSplitLogWorkerCoordination extends ZKListener implements
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This function calculates how many splitters this RS should create based on expected average
|
|
||||||
* tasks per RS and the hard limit upper bound(maxConcurrentTasks) set by configuration. <br>
|
|
||||||
* At any given time, a RS allows spawn MIN(Expected Tasks/RS, Hard Upper Bound)
|
|
||||||
* @param numTasks total number of split tasks available
|
|
||||||
* @return number of tasks this RS can grab
|
|
||||||
*/
|
|
||||||
private int getNumExpectedTasksPerRS(int numTasks) {
|
|
||||||
// at lease one RS(itself) available
|
|
||||||
int availableRSs = 1;
|
|
||||||
try {
|
|
||||||
List<String> regionServers =
|
|
||||||
ZKUtil.listChildrenNoWatch(watcher, watcher.getZNodePaths().rsZNode);
|
|
||||||
availableRSs = Math.max(availableRSs, (regionServers == null) ? 0 : regionServers.size());
|
|
||||||
} catch (KeeperException e) {
|
|
||||||
// do nothing
|
|
||||||
LOG.debug("getAvailableRegionServers got ZooKeeper exception", e);
|
|
||||||
}
|
|
||||||
int expectedTasksPerRS = (numTasks / availableRSs) + ((numTasks % availableRSs == 0) ? 0 : 1);
|
|
||||||
return Math.max(1, expectedTasksPerRS); // at least be one
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @param expectedTasksPerRS Average number of tasks to be handled by each RS
|
|
||||||
* @return true if more splitters are available, otherwise false.
|
* @return true if more splitters are available, otherwise false.
|
||||||
*/
|
*/
|
||||||
private boolean areSplittersAvailable(int expectedTasksPerRS) {
|
private boolean areSplittersAvailable() {
|
||||||
return (Math.min(expectedTasksPerRS, maxConcurrentTasks)
|
return maxConcurrentTasks - tasksInProgress.get() > 0;
|
||||||
- this.tasksInProgress.get()) > 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -432,13 +408,14 @@ public class ZkSplitLogWorkerCoordination extends ZKListener implements
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
int numTasks = paths.size();
|
int numTasks = paths.size();
|
||||||
int expectedTasksPerRS = getNumExpectedTasksPerRS(numTasks);
|
|
||||||
boolean taskGrabbed = false;
|
boolean taskGrabbed = false;
|
||||||
for (int i = 0; i < numTasks; i++) {
|
for (int i = 0; i < numTasks; i++) {
|
||||||
while (!shouldStop) {
|
while (!shouldStop) {
|
||||||
if (this.areSplittersAvailable(expectedTasksPerRS)) {
|
if (this.areSplittersAvailable()) {
|
||||||
LOG.debug("Current region server " + server.getServerName()
|
if (LOG.isTraceEnabled()) {
|
||||||
|
LOG.trace("Current region server " + server.getServerName()
|
||||||
+ " is ready to take more tasks, will get task list and try grab tasks again.");
|
+ " is ready to take more tasks, will get task list and try grab tasks again.");
|
||||||
|
}
|
||||||
int idx = (i + offset) % paths.size();
|
int idx = (i + offset) % paths.size();
|
||||||
// don't call ZKSplitLog.getNodeName() because that will lead to
|
// don't call ZKSplitLog.getNodeName() because that will lead to
|
||||||
// double encoding of the path name
|
// double encoding of the path name
|
||||||
|
@ -446,8 +423,10 @@ public class ZkSplitLogWorkerCoordination extends ZKListener implements
|
||||||
watcher.getZNodePaths().splitLogZNode, paths.get(idx)));
|
watcher.getZNodePaths().splitLogZNode, paths.get(idx)));
|
||||||
break;
|
break;
|
||||||
} else {
|
} else {
|
||||||
LOG.debug("Current region server " + server.getServerName() + " has "
|
if (LOG.isTraceEnabled()) {
|
||||||
|
LOG.trace("Current region server " + server.getServerName() + " has "
|
||||||
+ this.tasksInProgress.get() + " tasks in progress and can't take more.");
|
+ this.tasksInProgress.get() + " tasks in progress and can't take more.");
|
||||||
|
}
|
||||||
Thread.sleep(100);
|
Thread.sleep(100);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -472,53 +472,6 @@ public class TestSplitLogWorker {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* The test checks SplitLogWorker should not spawn more splitters than expected num of tasks per
|
|
||||||
* RS
|
|
||||||
* @throws Exception
|
|
||||||
*/
|
|
||||||
@Test
|
|
||||||
public void testAcquireMultiTasksByAvgTasksPerRS() throws Exception {
|
|
||||||
LOG.info("testAcquireMultiTasks");
|
|
||||||
SplitLogCounters.resetCounters();
|
|
||||||
final String TATAS = "tatas";
|
|
||||||
final ServerName RS = ServerName.valueOf("rs,1,1");
|
|
||||||
final ServerName RS2 = ServerName.valueOf("rs,1,2");
|
|
||||||
final int maxTasks = 3;
|
|
||||||
Configuration testConf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
|
|
||||||
testConf.setInt(HBASE_SPLIT_WAL_MAX_SPLITTER, maxTasks);
|
|
||||||
RegionServerServices mockedRS = getRegionServer(RS);
|
|
||||||
|
|
||||||
// create two RS nodes
|
|
||||||
String rsPath = ZNodePaths.joinZNode(zkw.getZNodePaths().rsZNode, RS.getServerName());
|
|
||||||
zkw.getRecoverableZooKeeper().create(rsPath, null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
|
|
||||||
rsPath = ZNodePaths.joinZNode(zkw.getZNodePaths().rsZNode, RS2.getServerName());
|
|
||||||
zkw.getRecoverableZooKeeper().create(rsPath, null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
|
|
||||||
|
|
||||||
for (int i = 0; i < maxTasks; i++) {
|
|
||||||
zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TATAS + i),
|
|
||||||
new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1")).toByteArray(),
|
|
||||||
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
|
|
||||||
}
|
|
||||||
|
|
||||||
SplitLogWorker slw = new SplitLogWorker(ds, testConf, mockedRS, neverEndingTask);
|
|
||||||
slw.start();
|
|
||||||
try {
|
|
||||||
int acquiredTasks = 0;
|
|
||||||
waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 2, WAIT_TIME);
|
|
||||||
for (int i = 0; i < maxTasks; i++) {
|
|
||||||
byte[] bytes = ZKUtil.getData(zkw, ZKSplitLog.getEncodedNodeName(zkw, TATAS + i));
|
|
||||||
SplitLogTask slt = SplitLogTask.parseFrom(bytes);
|
|
||||||
if (slt.isOwned(RS)) {
|
|
||||||
acquiredTasks++;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
assertEquals(2, acquiredTasks);
|
|
||||||
} finally {
|
|
||||||
stopSplitLogWorker(slw);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a mocked region server service instance
|
* Create a mocked region server service instance
|
||||||
*/
|
*/
|
||||||
|
|
Loading…
Reference in New Issue