From 8b32d3792934507c774997cd82dc061b75410f83 Mon Sep 17 00:00:00 2001 From: binlijin Date: Wed, 29 Nov 2017 18:42:14 +0800 Subject: [PATCH] HBASE-19290 Reduce zk request when doing split log --- .../ZkSplitLogWorkerCoordination.java | 78 ++++++++++++------- 1 file changed, 51 insertions(+), 27 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java index e64907c2532..0540a8f8580 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.coordination; +import java.util.Collections; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.LongAdder; @@ -199,27 +200,28 @@ public class ZkSplitLogWorkerCoordination extends ZKListener implements * try to grab a 'lock' on the task zk node to own and execute the task. *

* @param path zk node for the task + * @return boolean value when grab a task success return true otherwise false */ - private void grabTask(String path) { + private boolean grabTask(String path) { Stat stat = new Stat(); byte[] data; synchronized (grabTaskLock) { currentTask = path; workerInGrabTask = true; if (Thread.interrupted()) { - return; + return false; } } try { try { if ((data = ZKUtil.getDataNoWatch(watcher, path, stat)) == null) { SplitLogCounters.tot_wkr_failed_to_grab_task_no_data.increment(); - return; + return false; } } catch (KeeperException e) { LOG.warn("Failed to get data for znode " + path, e); SplitLogCounters.tot_wkr_failed_to_grab_task_exception.increment(); - return; + return false; } SplitLogTask slt; try { @@ -227,18 +229,18 @@ public class ZkSplitLogWorkerCoordination extends ZKListener implements } catch (DeserializationException e) { LOG.warn("Failed parse data for znode " + path, e); SplitLogCounters.tot_wkr_failed_to_grab_task_exception.increment(); - return; + return false; } if (!slt.isUnassigned()) { SplitLogCounters.tot_wkr_failed_to_grab_task_owned.increment(); - return; + return false; } currentVersion = attemptToOwnTask(true, watcher, server.getServerName(), path, stat.getVersion()); if (currentVersion < 0) { SplitLogCounters.tot_wkr_failed_to_grab_task_lost_race.increment(); - return; + return false; } if (ZKSplitLog.isRescanNode(watcher, currentTask)) { @@ -249,7 +251,7 @@ public class ZkSplitLogWorkerCoordination extends ZKListener implements endTask(new SplitLogTask.Done(server.getServerName()), SplitLogCounters.tot_wkr_task_acquired_rescan, splitTaskDetails); - return; + return false; } LOG.info("worker " + server.getServerName() + " acquired task " + path); @@ -266,6 +268,7 @@ public class ZkSplitLogWorkerCoordination extends ZKListener implements LOG.warn("Interrupted while yielding for other region servers", e); Thread.currentThread().interrupt(); } + return true; } finally { synchronized (grabTaskLock) { workerInGrabTask = false; @@ -316,12 +319,13 @@ public class ZkSplitLogWorkerCoordination extends ZKListener implements } /** - * This function calculates how many splitters it could create based on expected average tasks per - * RS and the hard limit upper bound(maxConcurrentTasks) set by configuration.
+ * This function calculates how many splitters this RS should create based on expected average + * tasks per RS and the hard limit upper bound(maxConcurrentTasks) set by configuration.
* At any given time, a RS allows spawn MIN(Expected Tasks/RS, Hard Upper Bound) - * @param numTasks current total number of available tasks + * @param numTasks total number of split tasks available + * @return number of tasks this RS can grab */ - private int calculateAvailableSplitters(int numTasks) { + private int getNumExpectedTasksPerRS(int numTasks) { // at lease one RS(itself) available int availableRSs = 1; try { @@ -332,12 +336,17 @@ public class ZkSplitLogWorkerCoordination extends ZKListener implements // do nothing LOG.debug("getAvailableRegionServers got ZooKeeper exception", e); } - int expectedTasksPerRS = (numTasks / availableRSs) + ((numTasks % availableRSs == 0) ? 0 : 1); - expectedTasksPerRS = Math.max(1, expectedTasksPerRS); // at least be one - // calculate how many more splitters we could spawn - return Math.min(expectedTasksPerRS, maxConcurrentTasks) - - this.tasksInProgress.get(); + return Math.max(1, expectedTasksPerRS); // at least be one + } + + /** + * @param expectedTasksPerRS Average number of tasks to be handled by each RS + * @return true if more splitters are available, otherwise false. + */ + private boolean areSplittersAvailable(int expectedTasksPerRS) { + return (Math.min(expectedTasksPerRS, maxConcurrentTasks) + - this.tasksInProgress.get()) > 0; } /** @@ -406,8 +415,11 @@ public class ZkSplitLogWorkerCoordination extends ZKListener implements + " ... worker thread exiting."); return; } + // shuffle the paths to prevent different split log worker start from the same log file after + // meta log (if any) + Collections.shuffle(paths); // pick meta wal firstly - int offset = (int) (Math.random() * paths.size()); + int offset = 0; for (int i = 0; i < paths.size(); i++) { if (AbstractFSWALProvider.isMetaFile(paths.get(i))) { offset = i; @@ -415,21 +427,33 @@ public class ZkSplitLogWorkerCoordination extends ZKListener implements } } int numTasks = paths.size(); + int expectedTasksPerRS = getNumExpectedTasksPerRS(numTasks); + boolean taskGrabbed = false; for (int i = 0; i < numTasks; i++) { - int idx = (i + offset) % paths.size(); - // don't call ZKSplitLog.getNodeName() because that will lead to - // double encoding of the path name - if (this.calculateAvailableSplitters(numTasks) > 0) { - grabTask(ZNodePaths.joinZNode(watcher.znodePaths.splitLogZNode, paths.get(idx))); - } else { - LOG.debug("Current region server " + server.getServerName() + " has " - + this.tasksInProgress.get() + " tasks in progress and can't take more."); - break; + while (!shouldStop) { + if (this.areSplittersAvailable(expectedTasksPerRS)) { + LOG.debug("Current region server " + server.getServerName() + + " is ready to take more tasks, will get task list and try grab tasks again."); + int idx = (i + offset) % paths.size(); + // don't call ZKSplitLog.getNodeName() because that will lead to + // double encoding of the path name + taskGrabbed |= grabTask(ZNodePaths.joinZNode( + watcher.znodePaths.splitLogZNode, paths.get(idx))); + break; + } else { + LOG.debug("Current region server " + server.getServerName() + " has " + + this.tasksInProgress.get() + " tasks in progress and can't take more."); + Thread.sleep(100); + } } if (shouldStop) { return; } } + if (!taskGrabbed && !shouldStop) { + // do not grab any tasks, sleep a little bit to reduce zk request. + Thread.sleep(1000); + } SplitLogCounters.tot_wkr_task_grabing.increment(); synchronized (taskReadySeq) { while (seq_start == taskReadySeq.get()) {