diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/EventType.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/EventType.java
index e4a722407dd..0060ce155a3 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/EventType.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/EventType.java
@@ -250,7 +250,15 @@ public enum EventType {
*
* RS_PARALLEL_SEEK
*/
- RS_PARALLEL_SEEK (80, ExecutorType.RS_PARALLEL_SEEK);
+ RS_PARALLEL_SEEK (80, ExecutorType.RS_PARALLEL_SEEK),
+
+ /**
+ * RS wal recovery work items(either creating recover.edits or directly replay wals)
+ * to be executed on the RS.
+ *
+ * RS_LOG_REPLAY
+ */
+ RS_LOG_REPLAY (81, ExecutorType.RS_LOG_REPLAY_OPS);
private final int code;
private final ExecutorType executor;
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java
index ccea4ff4034..45058aa3689 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java
@@ -44,7 +44,8 @@ public enum ExecutorType {
RS_CLOSE_REGION (23),
RS_CLOSE_ROOT (24),
RS_CLOSE_META (25),
- RS_PARALLEL_SEEK (26);
+ RS_PARALLEL_SEEK (26),
+ RS_LOG_REPLAY_OPS (27);
ExecutorType(int value) {}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index ed2d48be0e6..7aaf17b6531 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -1583,6 +1583,8 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
this.service.startExecutorService(ExecutorType.RS_PARALLEL_SEEK,
conf.getInt("hbase.storescanner.parallel.seek.threads", 10));
}
+ this.service.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS,
+ conf.getInt("hbase.regionserver.wal.max.splitters", SplitLogWorker.DEFAULT_MAX_SPLITTERS));
Threads.setDaemonThreadRunning(this.hlogRoller.getThread(), n + ".logRoller",
uncaughtExceptionHandler);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
index 1c0e4226b30..5ae86428fbc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
@@ -25,23 +25,27 @@ import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.commons.lang.math.RandomUtils;
+import org.apache.commons.lang.mutable.MutableInt;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.SplitLogCounters;
import org.apache.hadoop.hbase.SplitLogTask;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.RetriesExhaustedException;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.master.SplitLogManager;
+import org.apache.hadoop.hbase.regionserver.handler.HLogSplitterHandler;
import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
import org.apache.hadoop.hbase.util.CancelableProgressable;
@@ -78,12 +82,17 @@ import org.apache.zookeeper.data.Stat;
*/
@InterfaceAudience.Private
public class SplitLogWorker extends ZooKeeperListener implements Runnable {
+ public static final int DEFAULT_MAX_SPLITTERS = 2;
+
private static final Log LOG = LogFactory.getLog(SplitLogWorker.class);
private static final int checkInterval = 5000; // 5 seconds
+ private static final int FAILED_TO_OWN_TASK = -1;
Thread worker;
private final ServerName serverName;
private final TaskExecutor splitTaskExecutor;
+ // thread pool which executes recovery work
+ private final ExecutorService executorService;
private final Object taskReadyLock = new Object();
volatile int taskReadySeq = 0;
@@ -95,9 +104,11 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
private final int report_period;
private RegionServerServices server = null;
private Configuration conf = null;
+ protected final AtomicInteger tasksInProgress = new AtomicInteger(0);
+ private int maxConcurrentTasks = 0;
- public SplitLogWorker(ZooKeeperWatcher watcher, Configuration conf,
- RegionServerServices server, TaskExecutor splitTaskExecutor) {
+ public SplitLogWorker(ZooKeeperWatcher watcher, Configuration conf, RegionServerServices server,
+ TaskExecutor splitTaskExecutor) {
super(watcher);
this.server = server;
this.serverName = server.getServerName();
@@ -105,16 +116,9 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
report_period = conf.getInt("hbase.splitlog.report.period",
conf.getInt("hbase.splitlog.manager.timeout", SplitLogManager.DEFAULT_TIMEOUT) / 3);
this.conf = conf;
- }
-
- public SplitLogWorker(ZooKeeperWatcher watcher, Configuration conf, ServerName serverName,
- TaskExecutor splitTaskExecutor) {
- super(watcher);
- this.serverName = serverName;
- this.splitTaskExecutor = splitTaskExecutor;
- report_period = conf.getInt("hbase.splitlog.report.period",
- conf.getInt("hbase.splitlog.manager.timeout", SplitLogManager.DEFAULT_TIMEOUT) / 3);
- this.conf = conf;
+ this.executorService = this.server.getExecutorService();
+ this.maxConcurrentTasks =
+ conf.getInt("hbase.regionserver.wal.max.splitters", DEFAULT_MAX_SPLITTERS);
}
public SplitLogWorker(final ZooKeeperWatcher watcher, final Configuration conf,
@@ -238,11 +242,18 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
break;
}
}
- for (int i = 0; i < paths.size(); i ++) {
+ int numTasks = paths.size();
+ for (int i = 0; i < numTasks; i++) {
int idx = (i + offset) % paths.size();
// don't call ZKSplitLog.getNodeName() because that will lead to
// double encoding of the path name
- grabTask(ZKUtil.joinZNode(watcher.splitLogZNode, paths.get(idx)));
+ if (this.calculateAvailableSplitters(numTasks) > 0) {
+ grabTask(ZKUtil.joinZNode(watcher.splitLogZNode, paths.get(idx)));
+ } else {
+ LOG.debug("Current region server " + this.serverName + " has "
+ + this.tasksInProgress.get() + " tasks in progress and can't take more.");
+ break;
+ }
if (exitWorker) {
return;
}
@@ -337,72 +348,33 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
return;
}
- currentVersion = stat.getVersion();
- if (!attemptToOwnTask(true)) {
+ currentVersion = attemptToOwnTask(true, watcher, serverName, path, stat.getVersion());
+ if (currentVersion < 0) {
SplitLogCounters.tot_wkr_failed_to_grab_task_lost_race.incrementAndGet();
return;
}
if (ZKSplitLog.isRescanNode(watcher, currentTask)) {
- endTask(new SplitLogTask.Done(this.serverName),
- SplitLogCounters.tot_wkr_task_acquired_rescan);
+ HLogSplitterHandler.endTask(watcher, new SplitLogTask.Done(this.serverName),
+ SplitLogCounters.tot_wkr_task_acquired_rescan, currentTask, currentVersion);
return;
}
+
LOG.info("worker " + serverName + " acquired task " + path);
SplitLogCounters.tot_wkr_task_acquired.incrementAndGet();
getDataSetWatchAsync();
- t = System.currentTimeMillis();
- TaskExecutor.Status status;
+ submitTask(path, currentVersion, this.report_period);
- status = splitTaskExecutor.exec(ZKSplitLog.getFileName(currentTask),
- new CancelableProgressable() {
-
- private long last_report_at = 0;
-
- @Override
- public boolean progress() {
- long t = EnvironmentEdgeManager.currentTimeMillis();
- if ((t - last_report_at) > report_period) {
- last_report_at = t;
- if (!attemptToOwnTask(false)) {
- LOG.warn("Failed to heartbeat the task" + currentTask);
- return false;
- }
- }
- return true;
- }
- });
-
- switch (status) {
- case DONE:
- endTask(new SplitLogTask.Done(this.serverName), SplitLogCounters.tot_wkr_task_done);
- break;
- case PREEMPTED:
- SplitLogCounters.tot_wkr_preempt_task.incrementAndGet();
- LOG.warn("task execution prempted " + path);
- break;
- case ERR:
- if (!exitWorker) {
- endTask(new SplitLogTask.Err(this.serverName), SplitLogCounters.tot_wkr_task_err);
- break;
- }
- // if the RS is exiting then there is probably a tons of stuff
- // that can go wrong. Resign instead of signaling error.
- //$FALL-THROUGH$
- case RESIGNED:
- if (exitWorker) {
- LOG.info("task execution interrupted because worker is exiting " + path);
- }
- endTask(new SplitLogTask.Resigned(this.serverName),
- SplitLogCounters.tot_wkr_task_resigned);
- break;
+ // after a successful submit, sleep a little bit to allow other RSs to grab the rest tasks
+ try {
+ int sleepTime = RandomUtils.nextInt(500) + 500;
+ Thread.sleep(sleepTime);
+ } catch (InterruptedException e) {
+ LOG.warn("Interrupted while yielding for other region servers", e);
+ Thread.currentThread().interrupt();
}
} finally {
- if (t > 0) {
- LOG.info("worker " + serverName + " done with task " + path +
- " in " + (System.currentTimeMillis() - t) + "ms");
- }
synchronized (grabTaskLock) {
workerInGrabTask = false;
// clear the interrupt from stopTask() otherwise the next task will
@@ -412,76 +384,109 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
}
}
+
/**
- * Try to own the task by transitioning the zk node data from UNASSIGNED to
- * OWNED.
+ * Try to own the task by transitioning the zk node data from UNASSIGNED to OWNED.
*
- * This method is also used to periodically heartbeat the task progress by
- * transitioning the node from OWNED to OWNED.
+ * This method is also used to periodically heartbeat the task progress by transitioning the node
+ * from OWNED to OWNED.
*
- * @return true if task path is successfully locked
+ * @param isFirstTime
+ * @param zkw
+ * @param server
+ * @param task
+ * @param taskZKVersion
+ * @return non-negative integer value when task can be owned by current region server otherwise -1
*/
- private boolean attemptToOwnTask(boolean isFirstTime) {
+ protected static int attemptToOwnTask(boolean isFirstTime, ZooKeeperWatcher zkw,
+ ServerName server, String task, int taskZKVersion) {
+ int latestZKVersion = FAILED_TO_OWN_TASK;
try {
- SplitLogTask slt = new SplitLogTask.Owned(this.serverName);
- Stat stat =
- this.watcher.getRecoverableZooKeeper().setData(currentTask, slt.toByteArray(), currentVersion);
+ SplitLogTask slt = new SplitLogTask.Owned(server);
+ Stat stat = zkw.getRecoverableZooKeeper().setData(task, slt.toByteArray(), taskZKVersion);
if (stat == null) {
- LOG.warn("zk.setData() returned null for path " + currentTask);
+ LOG.warn("zk.setData() returned null for path " + task);
SplitLogCounters.tot_wkr_task_heartbeat_failed.incrementAndGet();
- return (false);
+ return FAILED_TO_OWN_TASK;
}
- currentVersion = stat.getVersion();
+ latestZKVersion = stat.getVersion();
SplitLogCounters.tot_wkr_task_heartbeat.incrementAndGet();
- return (true);
+ return latestZKVersion;
} catch (KeeperException e) {
if (!isFirstTime) {
if (e.code().equals(KeeperException.Code.NONODE)) {
- LOG.warn("NONODE failed to assert ownership for " + currentTask, e);
+ LOG.warn("NONODE failed to assert ownership for " + task, e);
} else if (e.code().equals(KeeperException.Code.BADVERSION)) {
- LOG.warn("BADVERSION failed to assert ownership for " +
- currentTask, e);
+ LOG.warn("BADVERSION failed to assert ownership for " + task, e);
} else {
- LOG.warn("failed to assert ownership for " + currentTask, e);
+ LOG.warn("failed to assert ownership for " + task, e);
}
}
} catch (InterruptedException e1) {
LOG.warn("Interrupted while trying to assert ownership of " +
- currentTask + " " + StringUtils.stringifyException(e1));
+ task + " " + StringUtils.stringifyException(e1));
Thread.currentThread().interrupt();
}
SplitLogCounters.tot_wkr_task_heartbeat_failed.incrementAndGet();
- return (false);
+ return FAILED_TO_OWN_TASK;
}
/**
- * endTask() can fail and the only way to recover out of it is for the
- * {@link SplitLogManager} to timeout the task node.
- * @param slt
- * @param ctr
+ * This function calculates how many splitters it could create based on expected average tasks per
+ * RS and the hard limit upper bound(maxConcurrentTasks) set by configuration.
+ * At any given time, a RS allows spawn MIN(Expected Tasks/RS, Hard Upper Bound)
+ * @param numTasks current total number of available tasks
+ * @return
*/
- private void endTask(SplitLogTask slt, AtomicLong ctr) {
- String path = currentTask;
- currentTask = null;
+ private int calculateAvailableSplitters(int numTasks) {
+ // at lease one RS(itself) available
+ int availableRSs = 1;
try {
- if (ZKUtil.setData(this.watcher, path, slt.toByteArray(),
- currentVersion)) {
- LOG.info("successfully transitioned task " + path + " to final state " + slt);
- ctr.incrementAndGet();
- return;
- }
- LOG.warn("failed to transistion task " + path + " to end state " + slt +
- " because of version mismatch ");
- } catch (KeeperException.BadVersionException bve) {
- LOG.warn("transisition task " + path + " to " + slt +
- " failed because of version mismatch", bve);
- } catch (KeeperException.NoNodeException e) {
- LOG.fatal("logic error - end task " + path + " " + slt +
- " failed because task doesn't exist", e);
+ List regionServers = ZKUtil.listChildrenNoWatch(watcher, watcher.rsZNode);
+ availableRSs = Math.max(availableRSs, (regionServers == null) ? 0 : regionServers.size());
} catch (KeeperException e) {
- LOG.warn("failed to end task, " + path + " " + slt, e);
+ // do nothing
+ LOG.debug("getAvailableRegionServers got ZooKeeper exception", e);
}
- SplitLogCounters.tot_wkr_final_transition_failed.incrementAndGet();
+
+ int expectedTasksPerRS = (numTasks / availableRSs) + ((numTasks % availableRSs == 0) ? 0 : 1);
+ expectedTasksPerRS = Math.max(1, expectedTasksPerRS); // at least be one
+ // calculate how many more splitters we could spawn
+ return Math.min(expectedTasksPerRS, this.maxConcurrentTasks) - this.tasksInProgress.get();
+ }
+
+ /**
+ * Submit a log split task to executor service
+ * @param curTask
+ * @param curTaskZKVersion
+ */
+ void submitTask(final String curTask, final int curTaskZKVersion, final int reportPeriod) {
+ final MutableInt zkVersion = new MutableInt(curTaskZKVersion);
+
+ CancelableProgressable reporter = new CancelableProgressable() {
+ private long last_report_at = 0;
+
+ @Override
+ public boolean progress() {
+ long t = EnvironmentEdgeManager.currentTimeMillis();
+ if ((t - last_report_at) > reportPeriod) {
+ last_report_at = t;
+ int latestZKVersion =
+ attemptToOwnTask(false, watcher, serverName, curTask, zkVersion.intValue());
+ if (latestZKVersion < 0) {
+ LOG.warn("Failed to heartbeat the task" + curTask);
+ return false;
+ }
+ zkVersion.setValue(latestZKVersion);
+ }
+ return true;
+ }
+ };
+
+ HLogSplitterHandler hsh =
+ new HLogSplitterHandler(this.server, curTask, zkVersion, reporter, this.tasksInProgress,
+ this.splitTaskExecutor);
+ this.executorService.submit(hsh);
}
void getDataSetWatchAsync() {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/HLogSplitterHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/HLogSplitterHandler.java
new file mode 100644
index 00000000000..4ac800ef84d
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/HLogSplitterHandler.java
@@ -0,0 +1,141 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.handler;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.lang.mutable.MutableInt;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.SplitLogCounters;
+import org.apache.hadoop.hbase.SplitLogTask;
+import org.apache.hadoop.hbase.executor.EventHandler;
+import org.apache.hadoop.hbase.executor.EventType;
+import org.apache.hadoop.hbase.master.SplitLogManager;
+import org.apache.hadoop.hbase.regionserver.SplitLogWorker.TaskExecutor;
+import org.apache.hadoop.hbase.regionserver.SplitLogWorker.TaskExecutor.Status;
+import org.apache.hadoop.hbase.util.CancelableProgressable;
+import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * Handles log splitting a wal
+ */
+@InterfaceAudience.Private
+public class HLogSplitterHandler extends EventHandler {
+ private static final Log LOG = LogFactory.getLog(HLogSplitterHandler.class);
+ private final ServerName serverName;
+ private final String curTask;
+ private final String wal;
+ private final ZooKeeperWatcher zkw;
+ private final CancelableProgressable reporter;
+ private final AtomicInteger inProgressTasks;
+ private final MutableInt curTaskZKVersion;
+ private final TaskExecutor splitTaskExecutor;
+
+ public HLogSplitterHandler(final Server server, String curTask,
+ final MutableInt curTaskZKVersion,
+ CancelableProgressable reporter,
+ AtomicInteger inProgressTasks, TaskExecutor splitTaskExecutor) {
+ super(server, EventType.RS_LOG_REPLAY);
+ this.curTask = curTask;
+ this.wal = ZKSplitLog.getFileName(curTask);
+ this.reporter = reporter;
+ this.inProgressTasks = inProgressTasks;
+ this.inProgressTasks.incrementAndGet();
+ this.serverName = server.getServerName();
+ this.zkw = server.getZooKeeper();
+ this.curTaskZKVersion = curTaskZKVersion;
+ this.splitTaskExecutor = splitTaskExecutor;
+ }
+
+ @Override
+ public void process() throws IOException {
+ long startTime = System.currentTimeMillis();
+ try {
+ Status status = this.splitTaskExecutor.exec(wal, reporter);
+ switch (status) {
+ case DONE:
+ endTask(zkw, new SplitLogTask.Done(this.serverName),
+ SplitLogCounters.tot_wkr_task_done, curTask, curTaskZKVersion.intValue());
+ break;
+ case PREEMPTED:
+ SplitLogCounters.tot_wkr_preempt_task.incrementAndGet();
+ LOG.warn("task execution prempted " + wal);
+ break;
+ case ERR:
+ if (server != null && !server.isStopped()) {
+ endTask(zkw, new SplitLogTask.Err(this.serverName),
+ SplitLogCounters.tot_wkr_task_err, curTask, curTaskZKVersion.intValue());
+ break;
+ }
+ // if the RS is exiting then there is probably a tons of stuff
+ // that can go wrong. Resign instead of signaling error.
+ //$FALL-THROUGH$
+ case RESIGNED:
+ if (server != null && server.isStopped()) {
+ LOG.info("task execution interrupted because worker is exiting " + curTask);
+ }
+ endTask(zkw, new SplitLogTask.Resigned(this.serverName),
+ SplitLogCounters.tot_wkr_task_resigned, curTask, curTaskZKVersion.intValue());
+ break;
+ }
+ } finally {
+ LOG.info("worker " + serverName + " done with task " + curTask + " in "
+ + (System.currentTimeMillis() - startTime) + "ms");
+ this.inProgressTasks.decrementAndGet();
+ }
+ }
+
+ /**
+ * endTask() can fail and the only way to recover out of it is for the
+ * {@link SplitLogManager} to timeout the task node.
+ * @param slt
+ * @param ctr
+ */
+ public static void endTask(ZooKeeperWatcher zkw, SplitLogTask slt, AtomicLong ctr, String task,
+ int taskZKVersion) {
+ try {
+ if (ZKUtil.setData(zkw, task, slt.toByteArray(), taskZKVersion)) {
+ LOG.info("successfully transitioned task " + task + " to final state " + slt);
+ ctr.incrementAndGet();
+ return;
+ }
+ LOG.warn("failed to transistion task " + task + " to end state " + slt
+ + " because of version mismatch ");
+ } catch (KeeperException.BadVersionException bve) {
+ LOG.warn("transisition task " + task + " to " + slt
+ + " failed because of version mismatch", bve);
+ } catch (KeeperException.NoNodeException e) {
+ LOG.fatal(
+ "logic error - end task " + task + " " + slt
+ + " failed because task doesn't exist", e);
+ } catch (KeeperException e) {
+ LOG.warn("failed to end task, " + task + " " + slt, e);
+ }
+ SplitLogCounters.tot_wkr_final_transition_failed.incrementAndGet();
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
index 655f3bafe88..d8b7ed48776 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
@@ -798,7 +798,7 @@ public class HLogSplitter {
private OutputSink outputSink = null;
WriterThread(OutputSink sink, int i) {
- super("WriterThread-" + i);
+ super(Thread.currentThread().getName() + "-Writer-" + i);
outputSink = sink;
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
index ccd63dae858..2b4e4f493d8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
@@ -155,6 +155,7 @@ public class TestDistributedLogSplitting {
conf.setInt("zookeeper.recovery.retry", 0);
conf.setInt(HConstants.REGIONSERVER_INFO_PORT, -1);
conf.setFloat(HConstants.LOAD_BALANCER_SLOP_KEY, (float) 100.0); // no load balancing
+ conf.setInt("hbase.regionserver.wal.max.splitters", 3);
TEST_UTIL = new HBaseTestingUtility(conf);
TEST_UTIL.setDFSCluster(dfsCluster);
TEST_UTIL.setZkCluster(zkCluster);
@@ -192,6 +193,7 @@ public class TestDistributedLogSplitting {
@Test (timeout=300000)
public void testRecoveredEdits() throws Exception {
LOG.info("testRecoveredEdits");
+ conf.setLong("hbase.regionserver.hlog.blocksize", 30 * 1024); // create more than one wal
conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false);
startCluster(NUM_RS);
@@ -248,10 +250,12 @@ public class TestDistributedLogSplitting {
HLogUtil.getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir, hri.getEncodedName()));
LOG.debug("checking edits dir " + editsdir);
FileStatus[] files = fs.listStatus(editsdir);
- assertEquals(1, files.length);
- int c = countHLog(files[0].getPath(), fs, conf);
- count += c;
- LOG.info(c + " edits in " + files[0].getPath());
+ assertTrue(files.length > 1);
+ for (int i = 0; i < files.length; i++) {
+ int c = countHLog(files[i].getPath(), fs, conf);
+ count += c;
+ }
+ LOG.info(count + " edits in " + files.length + " recovered edits files.");
}
assertEquals(NUM_LOG_LINES, count);
}
@@ -259,6 +263,7 @@ public class TestDistributedLogSplitting {
@Test(timeout = 300000)
public void testLogReplayWithNonMetaRSDown() throws Exception {
LOG.info("testLogReplayWithNonMetaRSDown");
+ conf.setLong("hbase.regionserver.hlog.blocksize", 30 * 1024); // create more than one wal
conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
startCluster(NUM_RS);
final int NUM_REGIONS_TO_CREATE = 40;
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java
index 75a1b713e46..ad816f9c060 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java
@@ -20,18 +20,25 @@ package org.apache.hadoop.hbase.regionserver;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.SplitLogCounters;
import org.apache.hadoop.hbase.SplitLogTask;
import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.executor.ExecutorService;
+import org.apache.hadoop.hbase.executor.ExecutorType;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
@@ -56,6 +63,7 @@ public class TestSplitLogWorker {
new HBaseTestingUtility();
private ZooKeeperWatcher zkw;
private SplitLogWorker slw;
+ private ExecutorService executorService;
private void waitForCounter(AtomicLong ctr, long oldval, long newval, long timems)
throws Exception {
@@ -69,14 +77,14 @@ public class TestSplitLogWorker {
return waitForCounterBoolean(ctr, oldval, newval, timems, true);
}
- private boolean waitForCounterBoolean(final AtomicLong ctr, final long oldval, long newval,
+ private boolean waitForCounterBoolean(final AtomicLong ctr, final long oldval, final long newval,
long timems, boolean failIfTimeout) throws Exception {
long timeWaited = TEST_UTIL.waitFor(timems, 10, failIfTimeout,
new Waiter.Predicate() {
@Override
public boolean evaluate() throws Exception {
- return (ctr.get() != oldval);
+ return (ctr.get() >= newval);
}
});
@@ -99,11 +107,18 @@ public class TestSplitLogWorker {
ZKUtil.createAndFailSilent(zkw, zkw.splitLogZNode);
assertTrue(ZKUtil.checkExists(zkw, zkw.splitLogZNode) != -1);
LOG.debug(zkw.splitLogZNode + " created");
+ ZKUtil.createAndFailSilent(zkw, zkw.rsZNode);
+ assertTrue(ZKUtil.checkExists(zkw, zkw.rsZNode) != -1);
SplitLogCounters.resetCounters();
+ executorService = new ExecutorService("TestSplitLogWorker");
+ executorService.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS, 10);
}
@After
public void teardown() throws Exception {
+ if (executorService != null) {
+ executorService.shutdown();
+ }
TEST_UTIL.shutdownMiniZKCluster();
}
@@ -132,11 +147,13 @@ public class TestSplitLogWorker {
SplitLogCounters.resetCounters();
final String TATAS = "tatas";
final ServerName RS = ServerName.valueOf("rs,1,1");
+ RegionServerServices mockedRS = getRegionServer(RS);
zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TATAS),
new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1")).toByteArray(), Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
- SplitLogWorker slw = new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), RS, neverEndingTask);
+ SplitLogWorker slw =
+ new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask);
slw.start();
try {
waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, 1500);
@@ -169,9 +186,12 @@ public class TestSplitLogWorker {
zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TRFT),
new SplitLogTask.Unassigned(MANAGER).toByteArray(), Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
-
- SplitLogWorker slw1 = new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), SVR1, neverEndingTask);
- SplitLogWorker slw2 = new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), SVR2, neverEndingTask);
+ RegionServerServices mockedRS1 = getRegionServer(SVR1);
+ RegionServerServices mockedRS2 = getRegionServer(SVR2);
+ SplitLogWorker slw1 =
+ new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), mockedRS1, neverEndingTask);
+ SplitLogWorker slw2 =
+ new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), mockedRS2, neverEndingTask);
slw1.start();
slw2.start();
try {
@@ -195,7 +215,9 @@ public class TestSplitLogWorker {
SplitLogCounters.resetCounters();
final ServerName SRV = ServerName.valueOf("tpt_svr,1,1");
final String PATH = ZKSplitLog.getEncodedNodeName(zkw, "tpt_task");
- SplitLogWorker slw = new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), SRV, neverEndingTask);
+ RegionServerServices mockedRS = getRegionServer(SRV);
+ SplitLogWorker slw =
+ new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask);
slw.start();
try {
Thread.yield(); // let the worker start
@@ -226,7 +248,9 @@ public class TestSplitLogWorker {
SplitLogCounters.resetCounters();
final ServerName SRV = ServerName.valueOf("tmt_svr,1,1");
final String PATH1 = ZKSplitLog.getEncodedNodeName(zkw, "tmt_task");
- SplitLogWorker slw = new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), SRV, neverEndingTask);
+ RegionServerServices mockedRS = getRegionServer(SRV);
+ SplitLogWorker slw =
+ new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask);
slw.start();
try {
Thread.yield(); // let the worker start
@@ -266,7 +290,8 @@ public class TestSplitLogWorker {
LOG.info("testRescan");
SplitLogCounters.resetCounters();
final ServerName SRV = ServerName.valueOf("svr,1,1");
- slw = new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), SRV, neverEndingTask);
+ RegionServerServices mockedRS = getRegionServer(SRV);
+ slw = new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask);
slw.start();
Thread.yield(); // let the worker start
Thread.sleep(100);
@@ -312,4 +337,100 @@ public class TestSplitLogWorker {
assertEquals(2, num);
}
+ @Test
+ public void testAcquireMultiTasks() throws Exception {
+ LOG.info("testAcquireMultiTasks");
+ SplitLogCounters.resetCounters();
+ final String TATAS = "tatas";
+ final ServerName RS = ServerName.valueOf("rs,1,1");
+ final int maxTasks = 3;
+ Configuration testConf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
+ testConf.setInt("hbase.regionserver.wal.max.splitters", maxTasks);
+ RegionServerServices mockedRS = getRegionServer(RS);
+
+ for (int i = 0; i < maxTasks; i++) {
+ zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TATAS + i),
+ new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1")).toByteArray(),
+ Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ }
+
+ SplitLogWorker slw = new SplitLogWorker(zkw, testConf, mockedRS, neverEndingTask);
+ slw.start();
+ try {
+ waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, maxTasks, 3000);
+ for (int i = 0; i < maxTasks; i++) {
+ byte[] bytes = ZKUtil.getData(zkw, ZKSplitLog.getEncodedNodeName(zkw, TATAS + i));
+ SplitLogTask slt = SplitLogTask.parseFrom(bytes);
+ assertTrue(slt.isOwned(RS));
+ }
+ } finally {
+ stopSplitLogWorker(slw);
+ }
+ }
+
+ /**
+ * The test checks SplitLogWorker should not spawn more splitters than expected num of tasks per
+ * RS
+ * @throws Exception
+ */
+ @Test
+ public void testAcquireMultiTasksByAvgTasksPerRS() throws Exception {
+ LOG.info("testAcquireMultiTasks");
+ SplitLogCounters.resetCounters();
+ final String TATAS = "tatas";
+ final ServerName RS = ServerName.valueOf("rs,1,1");
+ final ServerName RS2 = ServerName.valueOf("rs,1,2");
+ final int maxTasks = 3;
+ Configuration testConf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
+ testConf.setInt("hbase.regionserver.wal.max.splitters", maxTasks);
+ RegionServerServices mockedRS = getRegionServer(RS);
+
+ // create two RS nodes
+ String rsPath = ZKUtil.joinZNode(zkw.rsZNode, RS.getServerName());
+ zkw.getRecoverableZooKeeper().create(rsPath, null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+ rsPath = ZKUtil.joinZNode(zkw.rsZNode, RS2.getServerName());
+ zkw.getRecoverableZooKeeper().create(rsPath, null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+
+ for (int i = 0; i < maxTasks; i++) {
+ zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TATAS + i),
+ new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1")).toByteArray(),
+ Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+ }
+
+ SplitLogWorker slw = new SplitLogWorker(zkw, testConf, mockedRS, neverEndingTask);
+ slw.start();
+ try {
+ int acquiredTasks = 0;
+ waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 2, 3000);
+ for (int i = 0; i < maxTasks; i++) {
+ byte[] bytes = ZKUtil.getData(zkw, ZKSplitLog.getEncodedNodeName(zkw, TATAS + i));
+ SplitLogTask slt = SplitLogTask.parseFrom(bytes);
+ if (slt.isOwned(RS)) {
+ acquiredTasks++;
+ }
+ }
+ assertEquals(2, acquiredTasks);
+ } finally {
+ stopSplitLogWorker(slw);
+ }
+ }
+
+ /**
+ * Create a mocked region server service instance
+ * @param server
+ * @return
+ */
+ private RegionServerServices getRegionServer(ServerName name) {
+
+ RegionServerServices mockedServer = mock(RegionServerServices.class);
+ when(mockedServer.getConfiguration()).thenReturn(TEST_UTIL.getConfiguration());
+ when(mockedServer.getServerName()).thenReturn(name);
+ when(mockedServer.getZooKeeper()).thenReturn(zkw);
+ when(mockedServer.isStopped()).thenReturn(false);
+ when(mockedServer.getExecutorService()).thenReturn(executorService);
+
+ return mockedServer;
+ }
+
}