HBASE-19290 Reduce zk request when doing split log
This commit is contained in:
parent
b4a4be65ea
commit
8b32d37929
|
@ -19,6 +19,7 @@
|
|||
|
||||
package org.apache.hadoop.hbase.coordination;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.LongAdder;
|
||||
|
@ -199,27 +200,28 @@ public class ZkSplitLogWorkerCoordination extends ZKListener implements
|
|||
* try to grab a 'lock' on the task zk node to own and execute the task.
|
||||
* <p>
|
||||
* @param path zk node for the task
|
||||
* @return boolean value when grab a task success return true otherwise false
|
||||
*/
|
||||
private void grabTask(String path) {
|
||||
private boolean grabTask(String path) {
|
||||
Stat stat = new Stat();
|
||||
byte[] data;
|
||||
synchronized (grabTaskLock) {
|
||||
currentTask = path;
|
||||
workerInGrabTask = true;
|
||||
if (Thread.interrupted()) {
|
||||
return;
|
||||
return false;
|
||||
}
|
||||
}
|
||||
try {
|
||||
try {
|
||||
if ((data = ZKUtil.getDataNoWatch(watcher, path, stat)) == null) {
|
||||
SplitLogCounters.tot_wkr_failed_to_grab_task_no_data.increment();
|
||||
return;
|
||||
return false;
|
||||
}
|
||||
} catch (KeeperException e) {
|
||||
LOG.warn("Failed to get data for znode " + path, e);
|
||||
SplitLogCounters.tot_wkr_failed_to_grab_task_exception.increment();
|
||||
return;
|
||||
return false;
|
||||
}
|
||||
SplitLogTask slt;
|
||||
try {
|
||||
|
@ -227,18 +229,18 @@ public class ZkSplitLogWorkerCoordination extends ZKListener implements
|
|||
} catch (DeserializationException e) {
|
||||
LOG.warn("Failed parse data for znode " + path, e);
|
||||
SplitLogCounters.tot_wkr_failed_to_grab_task_exception.increment();
|
||||
return;
|
||||
return false;
|
||||
}
|
||||
if (!slt.isUnassigned()) {
|
||||
SplitLogCounters.tot_wkr_failed_to_grab_task_owned.increment();
|
||||
return;
|
||||
return false;
|
||||
}
|
||||
|
||||
currentVersion =
|
||||
attemptToOwnTask(true, watcher, server.getServerName(), path, stat.getVersion());
|
||||
if (currentVersion < 0) {
|
||||
SplitLogCounters.tot_wkr_failed_to_grab_task_lost_race.increment();
|
||||
return;
|
||||
return false;
|
||||
}
|
||||
|
||||
if (ZKSplitLog.isRescanNode(watcher, currentTask)) {
|
||||
|
@ -249,7 +251,7 @@ public class ZkSplitLogWorkerCoordination extends ZKListener implements
|
|||
|
||||
endTask(new SplitLogTask.Done(server.getServerName()),
|
||||
SplitLogCounters.tot_wkr_task_acquired_rescan, splitTaskDetails);
|
||||
return;
|
||||
return false;
|
||||
}
|
||||
|
||||
LOG.info("worker " + server.getServerName() + " acquired task " + path);
|
||||
|
@ -266,6 +268,7 @@ public class ZkSplitLogWorkerCoordination extends ZKListener implements
|
|||
LOG.warn("Interrupted while yielding for other region servers", e);
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
return true;
|
||||
} finally {
|
||||
synchronized (grabTaskLock) {
|
||||
workerInGrabTask = false;
|
||||
|
@ -316,12 +319,13 @@ public class ZkSplitLogWorkerCoordination extends ZKListener implements
|
|||
}
|
||||
|
||||
/**
|
||||
* This function calculates how many splitters it could create based on expected average tasks per
|
||||
* RS and the hard limit upper bound(maxConcurrentTasks) set by configuration. <br>
|
||||
* This function calculates how many splitters this RS should create based on expected average
|
||||
* tasks per RS and the hard limit upper bound(maxConcurrentTasks) set by configuration. <br>
|
||||
* At any given time, a RS allows spawn MIN(Expected Tasks/RS, Hard Upper Bound)
|
||||
* @param numTasks current total number of available tasks
|
||||
* @param numTasks total number of split tasks available
|
||||
* @return number of tasks this RS can grab
|
||||
*/
|
||||
private int calculateAvailableSplitters(int numTasks) {
|
||||
private int getNumExpectedTasksPerRS(int numTasks) {
|
||||
// at lease one RS(itself) available
|
||||
int availableRSs = 1;
|
||||
try {
|
||||
|
@ -332,12 +336,17 @@ public class ZkSplitLogWorkerCoordination extends ZKListener implements
|
|||
// do nothing
|
||||
LOG.debug("getAvailableRegionServers got ZooKeeper exception", e);
|
||||
}
|
||||
|
||||
int expectedTasksPerRS = (numTasks / availableRSs) + ((numTasks % availableRSs == 0) ? 0 : 1);
|
||||
expectedTasksPerRS = Math.max(1, expectedTasksPerRS); // at least be one
|
||||
// calculate how many more splitters we could spawn
|
||||
return Math.min(expectedTasksPerRS, maxConcurrentTasks)
|
||||
- this.tasksInProgress.get();
|
||||
return Math.max(1, expectedTasksPerRS); // at least be one
|
||||
}
|
||||
|
||||
/**
|
||||
* @param expectedTasksPerRS Average number of tasks to be handled by each RS
|
||||
* @return true if more splitters are available, otherwise false.
|
||||
*/
|
||||
private boolean areSplittersAvailable(int expectedTasksPerRS) {
|
||||
return (Math.min(expectedTasksPerRS, maxConcurrentTasks)
|
||||
- this.tasksInProgress.get()) > 0;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -406,8 +415,11 @@ public class ZkSplitLogWorkerCoordination extends ZKListener implements
|
|||
+ " ... worker thread exiting.");
|
||||
return;
|
||||
}
|
||||
// shuffle the paths to prevent different split log worker start from the same log file after
|
||||
// meta log (if any)
|
||||
Collections.shuffle(paths);
|
||||
// pick meta wal firstly
|
||||
int offset = (int) (Math.random() * paths.size());
|
||||
int offset = 0;
|
||||
for (int i = 0; i < paths.size(); i++) {
|
||||
if (AbstractFSWALProvider.isMetaFile(paths.get(i))) {
|
||||
offset = i;
|
||||
|
@ -415,21 +427,33 @@ public class ZkSplitLogWorkerCoordination extends ZKListener implements
|
|||
}
|
||||
}
|
||||
int numTasks = paths.size();
|
||||
int expectedTasksPerRS = getNumExpectedTasksPerRS(numTasks);
|
||||
boolean taskGrabbed = false;
|
||||
for (int i = 0; i < numTasks; i++) {
|
||||
while (!shouldStop) {
|
||||
if (this.areSplittersAvailable(expectedTasksPerRS)) {
|
||||
LOG.debug("Current region server " + server.getServerName()
|
||||
+ " is ready to take more tasks, will get task list and try grab tasks again.");
|
||||
int idx = (i + offset) % paths.size();
|
||||
// don't call ZKSplitLog.getNodeName() because that will lead to
|
||||
// double encoding of the path name
|
||||
if (this.calculateAvailableSplitters(numTasks) > 0) {
|
||||
grabTask(ZNodePaths.joinZNode(watcher.znodePaths.splitLogZNode, paths.get(idx)));
|
||||
taskGrabbed |= grabTask(ZNodePaths.joinZNode(
|
||||
watcher.znodePaths.splitLogZNode, paths.get(idx)));
|
||||
break;
|
||||
} else {
|
||||
LOG.debug("Current region server " + server.getServerName() + " has "
|
||||
+ this.tasksInProgress.get() + " tasks in progress and can't take more.");
|
||||
break;
|
||||
Thread.sleep(100);
|
||||
}
|
||||
}
|
||||
if (shouldStop) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
if (!taskGrabbed && !shouldStop) {
|
||||
// do not grab any tasks, sleep a little bit to reduce zk request.
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
SplitLogCounters.tot_wkr_task_grabing.increment();
|
||||
synchronized (taskReadySeq) {
|
||||
while (seq_start == taskReadySeq.get()) {
|
||||
|
|
Loading…
Reference in New Issue