HBASE-9736: Allow more than one log splitter per RS
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1545052 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
129ce2523d
commit
ad5436eaa1
|
@ -250,7 +250,15 @@ public enum EventType {
|
||||||
*
|
*
|
||||||
* RS_PARALLEL_SEEK
|
* RS_PARALLEL_SEEK
|
||||||
*/
|
*/
|
||||||
RS_PARALLEL_SEEK (80, ExecutorType.RS_PARALLEL_SEEK);
|
RS_PARALLEL_SEEK (80, ExecutorType.RS_PARALLEL_SEEK),
|
||||||
|
|
||||||
|
/**
|
||||||
|
* RS wal recovery work items(either creating recover.edits or directly replay wals)
|
||||||
|
* to be executed on the RS.<br>
|
||||||
|
*
|
||||||
|
* RS_LOG_REPLAY
|
||||||
|
*/
|
||||||
|
RS_LOG_REPLAY (81, ExecutorType.RS_LOG_REPLAY_OPS);
|
||||||
|
|
||||||
private final int code;
|
private final int code;
|
||||||
private final ExecutorType executor;
|
private final ExecutorType executor;
|
||||||
|
|
|
@ -44,7 +44,8 @@ public enum ExecutorType {
|
||||||
RS_CLOSE_REGION (23),
|
RS_CLOSE_REGION (23),
|
||||||
RS_CLOSE_ROOT (24),
|
RS_CLOSE_ROOT (24),
|
||||||
RS_CLOSE_META (25),
|
RS_CLOSE_META (25),
|
||||||
RS_PARALLEL_SEEK (26);
|
RS_PARALLEL_SEEK (26),
|
||||||
|
RS_LOG_REPLAY_OPS (27);
|
||||||
|
|
||||||
ExecutorType(int value) {}
|
ExecutorType(int value) {}
|
||||||
|
|
||||||
|
|
|
@ -1583,6 +1583,8 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
|
||||||
this.service.startExecutorService(ExecutorType.RS_PARALLEL_SEEK,
|
this.service.startExecutorService(ExecutorType.RS_PARALLEL_SEEK,
|
||||||
conf.getInt("hbase.storescanner.parallel.seek.threads", 10));
|
conf.getInt("hbase.storescanner.parallel.seek.threads", 10));
|
||||||
}
|
}
|
||||||
|
this.service.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS,
|
||||||
|
conf.getInt("hbase.regionserver.wal.max.splitters", SplitLogWorker.DEFAULT_MAX_SPLITTERS));
|
||||||
|
|
||||||
Threads.setDaemonThreadRunning(this.hlogRoller.getThread(), n + ".logRoller",
|
Threads.setDaemonThreadRunning(this.hlogRoller.getThread(), n + ".logRoller",
|
||||||
uncaughtExceptionHandler);
|
uncaughtExceptionHandler);
|
||||||
|
|
|
@ -25,23 +25,27 @@ import java.net.SocketTimeoutException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
|
import org.apache.commons.lang.math.RandomUtils;
|
||||||
|
import org.apache.commons.lang.mutable.MutableInt;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.NotServingRegionException;
|
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
import org.apache.hadoop.hbase.NotServingRegionException;
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
import org.apache.hadoop.hbase.SplitLogCounters;
|
import org.apache.hadoop.hbase.SplitLogCounters;
|
||||||
import org.apache.hadoop.hbase.SplitLogTask;
|
import org.apache.hadoop.hbase.SplitLogTask;
|
||||||
import org.apache.hadoop.hbase.client.HConnectionManager;
|
import org.apache.hadoop.hbase.client.HConnectionManager;
|
||||||
import org.apache.hadoop.hbase.client.RetriesExhaustedException;
|
import org.apache.hadoop.hbase.client.RetriesExhaustedException;
|
||||||
import org.apache.hadoop.hbase.exceptions.DeserializationException;
|
import org.apache.hadoop.hbase.exceptions.DeserializationException;
|
||||||
|
import org.apache.hadoop.hbase.executor.ExecutorService;
|
||||||
import org.apache.hadoop.hbase.master.SplitLogManager;
|
import org.apache.hadoop.hbase.master.SplitLogManager;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.handler.HLogSplitterHandler;
|
||||||
import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
|
import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
|
||||||
import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
|
import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
|
||||||
import org.apache.hadoop.hbase.util.CancelableProgressable;
|
import org.apache.hadoop.hbase.util.CancelableProgressable;
|
||||||
|
@ -78,12 +82,17 @@ import org.apache.zookeeper.data.Stat;
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class SplitLogWorker extends ZooKeeperListener implements Runnable {
|
public class SplitLogWorker extends ZooKeeperListener implements Runnable {
|
||||||
|
public static final int DEFAULT_MAX_SPLITTERS = 2;
|
||||||
|
|
||||||
private static final Log LOG = LogFactory.getLog(SplitLogWorker.class);
|
private static final Log LOG = LogFactory.getLog(SplitLogWorker.class);
|
||||||
private static final int checkInterval = 5000; // 5 seconds
|
private static final int checkInterval = 5000; // 5 seconds
|
||||||
|
private static final int FAILED_TO_OWN_TASK = -1;
|
||||||
|
|
||||||
Thread worker;
|
Thread worker;
|
||||||
private final ServerName serverName;
|
private final ServerName serverName;
|
||||||
private final TaskExecutor splitTaskExecutor;
|
private final TaskExecutor splitTaskExecutor;
|
||||||
|
// thread pool which executes recovery work
|
||||||
|
private final ExecutorService executorService;
|
||||||
|
|
||||||
private final Object taskReadyLock = new Object();
|
private final Object taskReadyLock = new Object();
|
||||||
volatile int taskReadySeq = 0;
|
volatile int taskReadySeq = 0;
|
||||||
|
@ -95,9 +104,11 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
|
||||||
private final int report_period;
|
private final int report_period;
|
||||||
private RegionServerServices server = null;
|
private RegionServerServices server = null;
|
||||||
private Configuration conf = null;
|
private Configuration conf = null;
|
||||||
|
protected final AtomicInteger tasksInProgress = new AtomicInteger(0);
|
||||||
|
private int maxConcurrentTasks = 0;
|
||||||
|
|
||||||
public SplitLogWorker(ZooKeeperWatcher watcher, Configuration conf,
|
public SplitLogWorker(ZooKeeperWatcher watcher, Configuration conf, RegionServerServices server,
|
||||||
RegionServerServices server, TaskExecutor splitTaskExecutor) {
|
TaskExecutor splitTaskExecutor) {
|
||||||
super(watcher);
|
super(watcher);
|
||||||
this.server = server;
|
this.server = server;
|
||||||
this.serverName = server.getServerName();
|
this.serverName = server.getServerName();
|
||||||
|
@ -105,16 +116,9 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
|
||||||
report_period = conf.getInt("hbase.splitlog.report.period",
|
report_period = conf.getInt("hbase.splitlog.report.period",
|
||||||
conf.getInt("hbase.splitlog.manager.timeout", SplitLogManager.DEFAULT_TIMEOUT) / 3);
|
conf.getInt("hbase.splitlog.manager.timeout", SplitLogManager.DEFAULT_TIMEOUT) / 3);
|
||||||
this.conf = conf;
|
this.conf = conf;
|
||||||
}
|
this.executorService = this.server.getExecutorService();
|
||||||
|
this.maxConcurrentTasks =
|
||||||
public SplitLogWorker(ZooKeeperWatcher watcher, Configuration conf, ServerName serverName,
|
conf.getInt("hbase.regionserver.wal.max.splitters", DEFAULT_MAX_SPLITTERS);
|
||||||
TaskExecutor splitTaskExecutor) {
|
|
||||||
super(watcher);
|
|
||||||
this.serverName = serverName;
|
|
||||||
this.splitTaskExecutor = splitTaskExecutor;
|
|
||||||
report_period = conf.getInt("hbase.splitlog.report.period",
|
|
||||||
conf.getInt("hbase.splitlog.manager.timeout", SplitLogManager.DEFAULT_TIMEOUT) / 3);
|
|
||||||
this.conf = conf;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public SplitLogWorker(final ZooKeeperWatcher watcher, final Configuration conf,
|
public SplitLogWorker(final ZooKeeperWatcher watcher, final Configuration conf,
|
||||||
|
@ -238,11 +242,18 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
for (int i = 0; i < paths.size(); i ++) {
|
int numTasks = paths.size();
|
||||||
|
for (int i = 0; i < numTasks; i++) {
|
||||||
int idx = (i + offset) % paths.size();
|
int idx = (i + offset) % paths.size();
|
||||||
// don't call ZKSplitLog.getNodeName() because that will lead to
|
// don't call ZKSplitLog.getNodeName() because that will lead to
|
||||||
// double encoding of the path name
|
// double encoding of the path name
|
||||||
grabTask(ZKUtil.joinZNode(watcher.splitLogZNode, paths.get(idx)));
|
if (this.calculateAvailableSplitters(numTasks) > 0) {
|
||||||
|
grabTask(ZKUtil.joinZNode(watcher.splitLogZNode, paths.get(idx)));
|
||||||
|
} else {
|
||||||
|
LOG.debug("Current region server " + this.serverName + " has "
|
||||||
|
+ this.tasksInProgress.get() + " tasks in progress and can't take more.");
|
||||||
|
break;
|
||||||
|
}
|
||||||
if (exitWorker) {
|
if (exitWorker) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -337,72 +348,33 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
currentVersion = stat.getVersion();
|
currentVersion = attemptToOwnTask(true, watcher, serverName, path, stat.getVersion());
|
||||||
if (!attemptToOwnTask(true)) {
|
if (currentVersion < 0) {
|
||||||
SplitLogCounters.tot_wkr_failed_to_grab_task_lost_race.incrementAndGet();
|
SplitLogCounters.tot_wkr_failed_to_grab_task_lost_race.incrementAndGet();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (ZKSplitLog.isRescanNode(watcher, currentTask)) {
|
if (ZKSplitLog.isRescanNode(watcher, currentTask)) {
|
||||||
endTask(new SplitLogTask.Done(this.serverName),
|
HLogSplitterHandler.endTask(watcher, new SplitLogTask.Done(this.serverName),
|
||||||
SplitLogCounters.tot_wkr_task_acquired_rescan);
|
SplitLogCounters.tot_wkr_task_acquired_rescan, currentTask, currentVersion);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
LOG.info("worker " + serverName + " acquired task " + path);
|
LOG.info("worker " + serverName + " acquired task " + path);
|
||||||
SplitLogCounters.tot_wkr_task_acquired.incrementAndGet();
|
SplitLogCounters.tot_wkr_task_acquired.incrementAndGet();
|
||||||
getDataSetWatchAsync();
|
getDataSetWatchAsync();
|
||||||
|
|
||||||
t = System.currentTimeMillis();
|
submitTask(path, currentVersion, this.report_period);
|
||||||
TaskExecutor.Status status;
|
|
||||||
|
|
||||||
status = splitTaskExecutor.exec(ZKSplitLog.getFileName(currentTask),
|
// after a successful submit, sleep a little bit to allow other RSs to grab the rest tasks
|
||||||
new CancelableProgressable() {
|
try {
|
||||||
|
int sleepTime = RandomUtils.nextInt(500) + 500;
|
||||||
private long last_report_at = 0;
|
Thread.sleep(sleepTime);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
@Override
|
LOG.warn("Interrupted while yielding for other region servers", e);
|
||||||
public boolean progress() {
|
Thread.currentThread().interrupt();
|
||||||
long t = EnvironmentEdgeManager.currentTimeMillis();
|
|
||||||
if ((t - last_report_at) > report_period) {
|
|
||||||
last_report_at = t;
|
|
||||||
if (!attemptToOwnTask(false)) {
|
|
||||||
LOG.warn("Failed to heartbeat the task" + currentTask);
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
switch (status) {
|
|
||||||
case DONE:
|
|
||||||
endTask(new SplitLogTask.Done(this.serverName), SplitLogCounters.tot_wkr_task_done);
|
|
||||||
break;
|
|
||||||
case PREEMPTED:
|
|
||||||
SplitLogCounters.tot_wkr_preempt_task.incrementAndGet();
|
|
||||||
LOG.warn("task execution prempted " + path);
|
|
||||||
break;
|
|
||||||
case ERR:
|
|
||||||
if (!exitWorker) {
|
|
||||||
endTask(new SplitLogTask.Err(this.serverName), SplitLogCounters.tot_wkr_task_err);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
// if the RS is exiting then there is probably a tons of stuff
|
|
||||||
// that can go wrong. Resign instead of signaling error.
|
|
||||||
//$FALL-THROUGH$
|
|
||||||
case RESIGNED:
|
|
||||||
if (exitWorker) {
|
|
||||||
LOG.info("task execution interrupted because worker is exiting " + path);
|
|
||||||
}
|
|
||||||
endTask(new SplitLogTask.Resigned(this.serverName),
|
|
||||||
SplitLogCounters.tot_wkr_task_resigned);
|
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
if (t > 0) {
|
|
||||||
LOG.info("worker " + serverName + " done with task " + path +
|
|
||||||
" in " + (System.currentTimeMillis() - t) + "ms");
|
|
||||||
}
|
|
||||||
synchronized (grabTaskLock) {
|
synchronized (grabTaskLock) {
|
||||||
workerInGrabTask = false;
|
workerInGrabTask = false;
|
||||||
// clear the interrupt from stopTask() otherwise the next task will
|
// clear the interrupt from stopTask() otherwise the next task will
|
||||||
|
@ -412,76 +384,109 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Try to own the task by transitioning the zk node data from UNASSIGNED to
|
* Try to own the task by transitioning the zk node data from UNASSIGNED to OWNED.
|
||||||
* OWNED.
|
|
||||||
* <p>
|
* <p>
|
||||||
* This method is also used to periodically heartbeat the task progress by
|
* This method is also used to periodically heartbeat the task progress by transitioning the node
|
||||||
* transitioning the node from OWNED to OWNED.
|
* from OWNED to OWNED.
|
||||||
* <p>
|
* <p>
|
||||||
* @return true if task path is successfully locked
|
* @param isFirstTime
|
||||||
|
* @param zkw
|
||||||
|
* @param server
|
||||||
|
* @param task
|
||||||
|
* @param taskZKVersion
|
||||||
|
* @return non-negative integer value when task can be owned by current region server otherwise -1
|
||||||
*/
|
*/
|
||||||
private boolean attemptToOwnTask(boolean isFirstTime) {
|
protected static int attemptToOwnTask(boolean isFirstTime, ZooKeeperWatcher zkw,
|
||||||
|
ServerName server, String task, int taskZKVersion) {
|
||||||
|
int latestZKVersion = FAILED_TO_OWN_TASK;
|
||||||
try {
|
try {
|
||||||
SplitLogTask slt = new SplitLogTask.Owned(this.serverName);
|
SplitLogTask slt = new SplitLogTask.Owned(server);
|
||||||
Stat stat =
|
Stat stat = zkw.getRecoverableZooKeeper().setData(task, slt.toByteArray(), taskZKVersion);
|
||||||
this.watcher.getRecoverableZooKeeper().setData(currentTask, slt.toByteArray(), currentVersion);
|
|
||||||
if (stat == null) {
|
if (stat == null) {
|
||||||
LOG.warn("zk.setData() returned null for path " + currentTask);
|
LOG.warn("zk.setData() returned null for path " + task);
|
||||||
SplitLogCounters.tot_wkr_task_heartbeat_failed.incrementAndGet();
|
SplitLogCounters.tot_wkr_task_heartbeat_failed.incrementAndGet();
|
||||||
return (false);
|
return FAILED_TO_OWN_TASK;
|
||||||
}
|
}
|
||||||
currentVersion = stat.getVersion();
|
latestZKVersion = stat.getVersion();
|
||||||
SplitLogCounters.tot_wkr_task_heartbeat.incrementAndGet();
|
SplitLogCounters.tot_wkr_task_heartbeat.incrementAndGet();
|
||||||
return (true);
|
return latestZKVersion;
|
||||||
} catch (KeeperException e) {
|
} catch (KeeperException e) {
|
||||||
if (!isFirstTime) {
|
if (!isFirstTime) {
|
||||||
if (e.code().equals(KeeperException.Code.NONODE)) {
|
if (e.code().equals(KeeperException.Code.NONODE)) {
|
||||||
LOG.warn("NONODE failed to assert ownership for " + currentTask, e);
|
LOG.warn("NONODE failed to assert ownership for " + task, e);
|
||||||
} else if (e.code().equals(KeeperException.Code.BADVERSION)) {
|
} else if (e.code().equals(KeeperException.Code.BADVERSION)) {
|
||||||
LOG.warn("BADVERSION failed to assert ownership for " +
|
LOG.warn("BADVERSION failed to assert ownership for " + task, e);
|
||||||
currentTask, e);
|
|
||||||
} else {
|
} else {
|
||||||
LOG.warn("failed to assert ownership for " + currentTask, e);
|
LOG.warn("failed to assert ownership for " + task, e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (InterruptedException e1) {
|
} catch (InterruptedException e1) {
|
||||||
LOG.warn("Interrupted while trying to assert ownership of " +
|
LOG.warn("Interrupted while trying to assert ownership of " +
|
||||||
currentTask + " " + StringUtils.stringifyException(e1));
|
task + " " + StringUtils.stringifyException(e1));
|
||||||
Thread.currentThread().interrupt();
|
Thread.currentThread().interrupt();
|
||||||
}
|
}
|
||||||
SplitLogCounters.tot_wkr_task_heartbeat_failed.incrementAndGet();
|
SplitLogCounters.tot_wkr_task_heartbeat_failed.incrementAndGet();
|
||||||
return (false);
|
return FAILED_TO_OWN_TASK;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* endTask() can fail and the only way to recover out of it is for the
|
* This function calculates how many splitters it could create based on expected average tasks per
|
||||||
* {@link SplitLogManager} to timeout the task node.
|
* RS and the hard limit upper bound(maxConcurrentTasks) set by configuration. <br>
|
||||||
* @param slt
|
* At any given time, a RS allows spawn MIN(Expected Tasks/RS, Hard Upper Bound)
|
||||||
* @param ctr
|
* @param numTasks current total number of available tasks
|
||||||
|
* @return
|
||||||
*/
|
*/
|
||||||
private void endTask(SplitLogTask slt, AtomicLong ctr) {
|
private int calculateAvailableSplitters(int numTasks) {
|
||||||
String path = currentTask;
|
// at lease one RS(itself) available
|
||||||
currentTask = null;
|
int availableRSs = 1;
|
||||||
try {
|
try {
|
||||||
if (ZKUtil.setData(this.watcher, path, slt.toByteArray(),
|
List<String> regionServers = ZKUtil.listChildrenNoWatch(watcher, watcher.rsZNode);
|
||||||
currentVersion)) {
|
availableRSs = Math.max(availableRSs, (regionServers == null) ? 0 : regionServers.size());
|
||||||
LOG.info("successfully transitioned task " + path + " to final state " + slt);
|
|
||||||
ctr.incrementAndGet();
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
LOG.warn("failed to transistion task " + path + " to end state " + slt +
|
|
||||||
" because of version mismatch ");
|
|
||||||
} catch (KeeperException.BadVersionException bve) {
|
|
||||||
LOG.warn("transisition task " + path + " to " + slt +
|
|
||||||
" failed because of version mismatch", bve);
|
|
||||||
} catch (KeeperException.NoNodeException e) {
|
|
||||||
LOG.fatal("logic error - end task " + path + " " + slt +
|
|
||||||
" failed because task doesn't exist", e);
|
|
||||||
} catch (KeeperException e) {
|
} catch (KeeperException e) {
|
||||||
LOG.warn("failed to end task, " + path + " " + slt, e);
|
// do nothing
|
||||||
|
LOG.debug("getAvailableRegionServers got ZooKeeper exception", e);
|
||||||
}
|
}
|
||||||
SplitLogCounters.tot_wkr_final_transition_failed.incrementAndGet();
|
|
||||||
|
int expectedTasksPerRS = (numTasks / availableRSs) + ((numTasks % availableRSs == 0) ? 0 : 1);
|
||||||
|
expectedTasksPerRS = Math.max(1, expectedTasksPerRS); // at least be one
|
||||||
|
// calculate how many more splitters we could spawn
|
||||||
|
return Math.min(expectedTasksPerRS, this.maxConcurrentTasks) - this.tasksInProgress.get();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Submit a log split task to executor service
|
||||||
|
* @param curTask
|
||||||
|
* @param curTaskZKVersion
|
||||||
|
*/
|
||||||
|
void submitTask(final String curTask, final int curTaskZKVersion, final int reportPeriod) {
|
||||||
|
final MutableInt zkVersion = new MutableInt(curTaskZKVersion);
|
||||||
|
|
||||||
|
CancelableProgressable reporter = new CancelableProgressable() {
|
||||||
|
private long last_report_at = 0;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean progress() {
|
||||||
|
long t = EnvironmentEdgeManager.currentTimeMillis();
|
||||||
|
if ((t - last_report_at) > reportPeriod) {
|
||||||
|
last_report_at = t;
|
||||||
|
int latestZKVersion =
|
||||||
|
attemptToOwnTask(false, watcher, serverName, curTask, zkVersion.intValue());
|
||||||
|
if (latestZKVersion < 0) {
|
||||||
|
LOG.warn("Failed to heartbeat the task" + curTask);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
zkVersion.setValue(latestZKVersion);
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
HLogSplitterHandler hsh =
|
||||||
|
new HLogSplitterHandler(this.server, curTask, zkVersion, reporter, this.tasksInProgress,
|
||||||
|
this.splitTaskExecutor);
|
||||||
|
this.executorService.submit(hsh);
|
||||||
}
|
}
|
||||||
|
|
||||||
void getDataSetWatchAsync() {
|
void getDataSetWatchAsync() {
|
||||||
|
|
|
@ -0,0 +1,141 @@
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.regionserver.handler;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
|
import org.apache.commons.lang.mutable.MutableInt;
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.hbase.Server;
|
||||||
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
|
import org.apache.hadoop.hbase.SplitLogCounters;
|
||||||
|
import org.apache.hadoop.hbase.SplitLogTask;
|
||||||
|
import org.apache.hadoop.hbase.executor.EventHandler;
|
||||||
|
import org.apache.hadoop.hbase.executor.EventType;
|
||||||
|
import org.apache.hadoop.hbase.master.SplitLogManager;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.SplitLogWorker.TaskExecutor;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.SplitLogWorker.TaskExecutor.Status;
|
||||||
|
import org.apache.hadoop.hbase.util.CancelableProgressable;
|
||||||
|
import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
|
||||||
|
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||||
|
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||||
|
import org.apache.zookeeper.KeeperException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Handles log splitting a wal
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public class HLogSplitterHandler extends EventHandler {
|
||||||
|
private static final Log LOG = LogFactory.getLog(HLogSplitterHandler.class);
|
||||||
|
private final ServerName serverName;
|
||||||
|
private final String curTask;
|
||||||
|
private final String wal;
|
||||||
|
private final ZooKeeperWatcher zkw;
|
||||||
|
private final CancelableProgressable reporter;
|
||||||
|
private final AtomicInteger inProgressTasks;
|
||||||
|
private final MutableInt curTaskZKVersion;
|
||||||
|
private final TaskExecutor splitTaskExecutor;
|
||||||
|
|
||||||
|
public HLogSplitterHandler(final Server server, String curTask,
|
||||||
|
final MutableInt curTaskZKVersion,
|
||||||
|
CancelableProgressable reporter,
|
||||||
|
AtomicInteger inProgressTasks, TaskExecutor splitTaskExecutor) {
|
||||||
|
super(server, EventType.RS_LOG_REPLAY);
|
||||||
|
this.curTask = curTask;
|
||||||
|
this.wal = ZKSplitLog.getFileName(curTask);
|
||||||
|
this.reporter = reporter;
|
||||||
|
this.inProgressTasks = inProgressTasks;
|
||||||
|
this.inProgressTasks.incrementAndGet();
|
||||||
|
this.serverName = server.getServerName();
|
||||||
|
this.zkw = server.getZooKeeper();
|
||||||
|
this.curTaskZKVersion = curTaskZKVersion;
|
||||||
|
this.splitTaskExecutor = splitTaskExecutor;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void process() throws IOException {
|
||||||
|
long startTime = System.currentTimeMillis();
|
||||||
|
try {
|
||||||
|
Status status = this.splitTaskExecutor.exec(wal, reporter);
|
||||||
|
switch (status) {
|
||||||
|
case DONE:
|
||||||
|
endTask(zkw, new SplitLogTask.Done(this.serverName),
|
||||||
|
SplitLogCounters.tot_wkr_task_done, curTask, curTaskZKVersion.intValue());
|
||||||
|
break;
|
||||||
|
case PREEMPTED:
|
||||||
|
SplitLogCounters.tot_wkr_preempt_task.incrementAndGet();
|
||||||
|
LOG.warn("task execution prempted " + wal);
|
||||||
|
break;
|
||||||
|
case ERR:
|
||||||
|
if (server != null && !server.isStopped()) {
|
||||||
|
endTask(zkw, new SplitLogTask.Err(this.serverName),
|
||||||
|
SplitLogCounters.tot_wkr_task_err, curTask, curTaskZKVersion.intValue());
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
// if the RS is exiting then there is probably a tons of stuff
|
||||||
|
// that can go wrong. Resign instead of signaling error.
|
||||||
|
//$FALL-THROUGH$
|
||||||
|
case RESIGNED:
|
||||||
|
if (server != null && server.isStopped()) {
|
||||||
|
LOG.info("task execution interrupted because worker is exiting " + curTask);
|
||||||
|
}
|
||||||
|
endTask(zkw, new SplitLogTask.Resigned(this.serverName),
|
||||||
|
SplitLogCounters.tot_wkr_task_resigned, curTask, curTaskZKVersion.intValue());
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
LOG.info("worker " + serverName + " done with task " + curTask + " in "
|
||||||
|
+ (System.currentTimeMillis() - startTime) + "ms");
|
||||||
|
this.inProgressTasks.decrementAndGet();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* endTask() can fail and the only way to recover out of it is for the
|
||||||
|
* {@link SplitLogManager} to timeout the task node.
|
||||||
|
* @param slt
|
||||||
|
* @param ctr
|
||||||
|
*/
|
||||||
|
public static void endTask(ZooKeeperWatcher zkw, SplitLogTask slt, AtomicLong ctr, String task,
|
||||||
|
int taskZKVersion) {
|
||||||
|
try {
|
||||||
|
if (ZKUtil.setData(zkw, task, slt.toByteArray(), taskZKVersion)) {
|
||||||
|
LOG.info("successfully transitioned task " + task + " to final state " + slt);
|
||||||
|
ctr.incrementAndGet();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
LOG.warn("failed to transistion task " + task + " to end state " + slt
|
||||||
|
+ " because of version mismatch ");
|
||||||
|
} catch (KeeperException.BadVersionException bve) {
|
||||||
|
LOG.warn("transisition task " + task + " to " + slt
|
||||||
|
+ " failed because of version mismatch", bve);
|
||||||
|
} catch (KeeperException.NoNodeException e) {
|
||||||
|
LOG.fatal(
|
||||||
|
"logic error - end task " + task + " " + slt
|
||||||
|
+ " failed because task doesn't exist", e);
|
||||||
|
} catch (KeeperException e) {
|
||||||
|
LOG.warn("failed to end task, " + task + " " + slt, e);
|
||||||
|
}
|
||||||
|
SplitLogCounters.tot_wkr_final_transition_failed.incrementAndGet();
|
||||||
|
}
|
||||||
|
}
|
|
@ -798,7 +798,7 @@ public class HLogSplitter {
|
||||||
private OutputSink outputSink = null;
|
private OutputSink outputSink = null;
|
||||||
|
|
||||||
WriterThread(OutputSink sink, int i) {
|
WriterThread(OutputSink sink, int i) {
|
||||||
super("WriterThread-" + i);
|
super(Thread.currentThread().getName() + "-Writer-" + i);
|
||||||
outputSink = sink;
|
outputSink = sink;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -155,6 +155,7 @@ public class TestDistributedLogSplitting {
|
||||||
conf.setInt("zookeeper.recovery.retry", 0);
|
conf.setInt("zookeeper.recovery.retry", 0);
|
||||||
conf.setInt(HConstants.REGIONSERVER_INFO_PORT, -1);
|
conf.setInt(HConstants.REGIONSERVER_INFO_PORT, -1);
|
||||||
conf.setFloat(HConstants.LOAD_BALANCER_SLOP_KEY, (float) 100.0); // no load balancing
|
conf.setFloat(HConstants.LOAD_BALANCER_SLOP_KEY, (float) 100.0); // no load balancing
|
||||||
|
conf.setInt("hbase.regionserver.wal.max.splitters", 3);
|
||||||
TEST_UTIL = new HBaseTestingUtility(conf);
|
TEST_UTIL = new HBaseTestingUtility(conf);
|
||||||
TEST_UTIL.setDFSCluster(dfsCluster);
|
TEST_UTIL.setDFSCluster(dfsCluster);
|
||||||
TEST_UTIL.setZkCluster(zkCluster);
|
TEST_UTIL.setZkCluster(zkCluster);
|
||||||
|
@ -192,6 +193,7 @@ public class TestDistributedLogSplitting {
|
||||||
@Test (timeout=300000)
|
@Test (timeout=300000)
|
||||||
public void testRecoveredEdits() throws Exception {
|
public void testRecoveredEdits() throws Exception {
|
||||||
LOG.info("testRecoveredEdits");
|
LOG.info("testRecoveredEdits");
|
||||||
|
conf.setLong("hbase.regionserver.hlog.blocksize", 30 * 1024); // create more than one wal
|
||||||
conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false);
|
conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false);
|
||||||
startCluster(NUM_RS);
|
startCluster(NUM_RS);
|
||||||
|
|
||||||
|
@ -248,10 +250,12 @@ public class TestDistributedLogSplitting {
|
||||||
HLogUtil.getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir, hri.getEncodedName()));
|
HLogUtil.getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir, hri.getEncodedName()));
|
||||||
LOG.debug("checking edits dir " + editsdir);
|
LOG.debug("checking edits dir " + editsdir);
|
||||||
FileStatus[] files = fs.listStatus(editsdir);
|
FileStatus[] files = fs.listStatus(editsdir);
|
||||||
assertEquals(1, files.length);
|
assertTrue(files.length > 1);
|
||||||
int c = countHLog(files[0].getPath(), fs, conf);
|
for (int i = 0; i < files.length; i++) {
|
||||||
count += c;
|
int c = countHLog(files[i].getPath(), fs, conf);
|
||||||
LOG.info(c + " edits in " + files[0].getPath());
|
count += c;
|
||||||
|
}
|
||||||
|
LOG.info(count + " edits in " + files.length + " recovered edits files.");
|
||||||
}
|
}
|
||||||
assertEquals(NUM_LOG_LINES, count);
|
assertEquals(NUM_LOG_LINES, count);
|
||||||
}
|
}
|
||||||
|
@ -259,6 +263,7 @@ public class TestDistributedLogSplitting {
|
||||||
@Test(timeout = 300000)
|
@Test(timeout = 300000)
|
||||||
public void testLogReplayWithNonMetaRSDown() throws Exception {
|
public void testLogReplayWithNonMetaRSDown() throws Exception {
|
||||||
LOG.info("testLogReplayWithNonMetaRSDown");
|
LOG.info("testLogReplayWithNonMetaRSDown");
|
||||||
|
conf.setLong("hbase.regionserver.hlog.blocksize", 30 * 1024); // create more than one wal
|
||||||
conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
|
conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
|
||||||
startCluster(NUM_RS);
|
startCluster(NUM_RS);
|
||||||
final int NUM_REGIONS_TO_CREATE = 40;
|
final int NUM_REGIONS_TO_CREATE = 40;
|
||||||
|
|
|
@ -20,18 +20,25 @@ package org.apache.hadoop.hbase.regionserver;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
import org.apache.hadoop.hbase.MediumTests;
|
import org.apache.hadoop.hbase.MediumTests;
|
||||||
|
import org.apache.hadoop.hbase.Server;
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
import org.apache.hadoop.hbase.SplitLogCounters;
|
import org.apache.hadoop.hbase.SplitLogCounters;
|
||||||
import org.apache.hadoop.hbase.SplitLogTask;
|
import org.apache.hadoop.hbase.SplitLogTask;
|
||||||
import org.apache.hadoop.hbase.Waiter;
|
import org.apache.hadoop.hbase.Waiter;
|
||||||
|
import org.apache.hadoop.hbase.executor.ExecutorService;
|
||||||
|
import org.apache.hadoop.hbase.executor.ExecutorType;
|
||||||
import org.apache.hadoop.hbase.util.CancelableProgressable;
|
import org.apache.hadoop.hbase.util.CancelableProgressable;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
|
import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||||
|
@ -56,6 +63,7 @@ public class TestSplitLogWorker {
|
||||||
new HBaseTestingUtility();
|
new HBaseTestingUtility();
|
||||||
private ZooKeeperWatcher zkw;
|
private ZooKeeperWatcher zkw;
|
||||||
private SplitLogWorker slw;
|
private SplitLogWorker slw;
|
||||||
|
private ExecutorService executorService;
|
||||||
|
|
||||||
private void waitForCounter(AtomicLong ctr, long oldval, long newval, long timems)
|
private void waitForCounter(AtomicLong ctr, long oldval, long newval, long timems)
|
||||||
throws Exception {
|
throws Exception {
|
||||||
|
@ -69,14 +77,14 @@ public class TestSplitLogWorker {
|
||||||
return waitForCounterBoolean(ctr, oldval, newval, timems, true);
|
return waitForCounterBoolean(ctr, oldval, newval, timems, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean waitForCounterBoolean(final AtomicLong ctr, final long oldval, long newval,
|
private boolean waitForCounterBoolean(final AtomicLong ctr, final long oldval, final long newval,
|
||||||
long timems, boolean failIfTimeout) throws Exception {
|
long timems, boolean failIfTimeout) throws Exception {
|
||||||
|
|
||||||
long timeWaited = TEST_UTIL.waitFor(timems, 10, failIfTimeout,
|
long timeWaited = TEST_UTIL.waitFor(timems, 10, failIfTimeout,
|
||||||
new Waiter.Predicate<Exception>() {
|
new Waiter.Predicate<Exception>() {
|
||||||
@Override
|
@Override
|
||||||
public boolean evaluate() throws Exception {
|
public boolean evaluate() throws Exception {
|
||||||
return (ctr.get() != oldval);
|
return (ctr.get() >= newval);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -99,11 +107,18 @@ public class TestSplitLogWorker {
|
||||||
ZKUtil.createAndFailSilent(zkw, zkw.splitLogZNode);
|
ZKUtil.createAndFailSilent(zkw, zkw.splitLogZNode);
|
||||||
assertTrue(ZKUtil.checkExists(zkw, zkw.splitLogZNode) != -1);
|
assertTrue(ZKUtil.checkExists(zkw, zkw.splitLogZNode) != -1);
|
||||||
LOG.debug(zkw.splitLogZNode + " created");
|
LOG.debug(zkw.splitLogZNode + " created");
|
||||||
|
ZKUtil.createAndFailSilent(zkw, zkw.rsZNode);
|
||||||
|
assertTrue(ZKUtil.checkExists(zkw, zkw.rsZNode) != -1);
|
||||||
SplitLogCounters.resetCounters();
|
SplitLogCounters.resetCounters();
|
||||||
|
executorService = new ExecutorService("TestSplitLogWorker");
|
||||||
|
executorService.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS, 10);
|
||||||
}
|
}
|
||||||
|
|
||||||
@After
|
@After
|
||||||
public void teardown() throws Exception {
|
public void teardown() throws Exception {
|
||||||
|
if (executorService != null) {
|
||||||
|
executorService.shutdown();
|
||||||
|
}
|
||||||
TEST_UTIL.shutdownMiniZKCluster();
|
TEST_UTIL.shutdownMiniZKCluster();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -132,11 +147,13 @@ public class TestSplitLogWorker {
|
||||||
SplitLogCounters.resetCounters();
|
SplitLogCounters.resetCounters();
|
||||||
final String TATAS = "tatas";
|
final String TATAS = "tatas";
|
||||||
final ServerName RS = ServerName.valueOf("rs,1,1");
|
final ServerName RS = ServerName.valueOf("rs,1,1");
|
||||||
|
RegionServerServices mockedRS = getRegionServer(RS);
|
||||||
zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TATAS),
|
zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TATAS),
|
||||||
new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1")).toByteArray(), Ids.OPEN_ACL_UNSAFE,
|
new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1")).toByteArray(), Ids.OPEN_ACL_UNSAFE,
|
||||||
CreateMode.PERSISTENT);
|
CreateMode.PERSISTENT);
|
||||||
|
|
||||||
SplitLogWorker slw = new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), RS, neverEndingTask);
|
SplitLogWorker slw =
|
||||||
|
new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask);
|
||||||
slw.start();
|
slw.start();
|
||||||
try {
|
try {
|
||||||
waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, 1500);
|
waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, 1500);
|
||||||
|
@ -169,9 +186,12 @@ public class TestSplitLogWorker {
|
||||||
zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TRFT),
|
zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TRFT),
|
||||||
new SplitLogTask.Unassigned(MANAGER).toByteArray(), Ids.OPEN_ACL_UNSAFE,
|
new SplitLogTask.Unassigned(MANAGER).toByteArray(), Ids.OPEN_ACL_UNSAFE,
|
||||||
CreateMode.PERSISTENT);
|
CreateMode.PERSISTENT);
|
||||||
|
RegionServerServices mockedRS1 = getRegionServer(SVR1);
|
||||||
SplitLogWorker slw1 = new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), SVR1, neverEndingTask);
|
RegionServerServices mockedRS2 = getRegionServer(SVR2);
|
||||||
SplitLogWorker slw2 = new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), SVR2, neverEndingTask);
|
SplitLogWorker slw1 =
|
||||||
|
new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), mockedRS1, neverEndingTask);
|
||||||
|
SplitLogWorker slw2 =
|
||||||
|
new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), mockedRS2, neverEndingTask);
|
||||||
slw1.start();
|
slw1.start();
|
||||||
slw2.start();
|
slw2.start();
|
||||||
try {
|
try {
|
||||||
|
@ -195,7 +215,9 @@ public class TestSplitLogWorker {
|
||||||
SplitLogCounters.resetCounters();
|
SplitLogCounters.resetCounters();
|
||||||
final ServerName SRV = ServerName.valueOf("tpt_svr,1,1");
|
final ServerName SRV = ServerName.valueOf("tpt_svr,1,1");
|
||||||
final String PATH = ZKSplitLog.getEncodedNodeName(zkw, "tpt_task");
|
final String PATH = ZKSplitLog.getEncodedNodeName(zkw, "tpt_task");
|
||||||
SplitLogWorker slw = new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), SRV, neverEndingTask);
|
RegionServerServices mockedRS = getRegionServer(SRV);
|
||||||
|
SplitLogWorker slw =
|
||||||
|
new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask);
|
||||||
slw.start();
|
slw.start();
|
||||||
try {
|
try {
|
||||||
Thread.yield(); // let the worker start
|
Thread.yield(); // let the worker start
|
||||||
|
@ -226,7 +248,9 @@ public class TestSplitLogWorker {
|
||||||
SplitLogCounters.resetCounters();
|
SplitLogCounters.resetCounters();
|
||||||
final ServerName SRV = ServerName.valueOf("tmt_svr,1,1");
|
final ServerName SRV = ServerName.valueOf("tmt_svr,1,1");
|
||||||
final String PATH1 = ZKSplitLog.getEncodedNodeName(zkw, "tmt_task");
|
final String PATH1 = ZKSplitLog.getEncodedNodeName(zkw, "tmt_task");
|
||||||
SplitLogWorker slw = new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), SRV, neverEndingTask);
|
RegionServerServices mockedRS = getRegionServer(SRV);
|
||||||
|
SplitLogWorker slw =
|
||||||
|
new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask);
|
||||||
slw.start();
|
slw.start();
|
||||||
try {
|
try {
|
||||||
Thread.yield(); // let the worker start
|
Thread.yield(); // let the worker start
|
||||||
|
@ -266,7 +290,8 @@ public class TestSplitLogWorker {
|
||||||
LOG.info("testRescan");
|
LOG.info("testRescan");
|
||||||
SplitLogCounters.resetCounters();
|
SplitLogCounters.resetCounters();
|
||||||
final ServerName SRV = ServerName.valueOf("svr,1,1");
|
final ServerName SRV = ServerName.valueOf("svr,1,1");
|
||||||
slw = new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), SRV, neverEndingTask);
|
RegionServerServices mockedRS = getRegionServer(SRV);
|
||||||
|
slw = new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask);
|
||||||
slw.start();
|
slw.start();
|
||||||
Thread.yield(); // let the worker start
|
Thread.yield(); // let the worker start
|
||||||
Thread.sleep(100);
|
Thread.sleep(100);
|
||||||
|
@ -312,4 +337,100 @@ public class TestSplitLogWorker {
|
||||||
assertEquals(2, num);
|
assertEquals(2, num);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAcquireMultiTasks() throws Exception {
|
||||||
|
LOG.info("testAcquireMultiTasks");
|
||||||
|
SplitLogCounters.resetCounters();
|
||||||
|
final String TATAS = "tatas";
|
||||||
|
final ServerName RS = ServerName.valueOf("rs,1,1");
|
||||||
|
final int maxTasks = 3;
|
||||||
|
Configuration testConf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
|
||||||
|
testConf.setInt("hbase.regionserver.wal.max.splitters", maxTasks);
|
||||||
|
RegionServerServices mockedRS = getRegionServer(RS);
|
||||||
|
|
||||||
|
for (int i = 0; i < maxTasks; i++) {
|
||||||
|
zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TATAS + i),
|
||||||
|
new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1")).toByteArray(),
|
||||||
|
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
|
||||||
|
}
|
||||||
|
|
||||||
|
SplitLogWorker slw = new SplitLogWorker(zkw, testConf, mockedRS, neverEndingTask);
|
||||||
|
slw.start();
|
||||||
|
try {
|
||||||
|
waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, maxTasks, 3000);
|
||||||
|
for (int i = 0; i < maxTasks; i++) {
|
||||||
|
byte[] bytes = ZKUtil.getData(zkw, ZKSplitLog.getEncodedNodeName(zkw, TATAS + i));
|
||||||
|
SplitLogTask slt = SplitLogTask.parseFrom(bytes);
|
||||||
|
assertTrue(slt.isOwned(RS));
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
stopSplitLogWorker(slw);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The test checks SplitLogWorker should not spawn more splitters than expected num of tasks per
|
||||||
|
* RS
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testAcquireMultiTasksByAvgTasksPerRS() throws Exception {
|
||||||
|
LOG.info("testAcquireMultiTasks");
|
||||||
|
SplitLogCounters.resetCounters();
|
||||||
|
final String TATAS = "tatas";
|
||||||
|
final ServerName RS = ServerName.valueOf("rs,1,1");
|
||||||
|
final ServerName RS2 = ServerName.valueOf("rs,1,2");
|
||||||
|
final int maxTasks = 3;
|
||||||
|
Configuration testConf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
|
||||||
|
testConf.setInt("hbase.regionserver.wal.max.splitters", maxTasks);
|
||||||
|
RegionServerServices mockedRS = getRegionServer(RS);
|
||||||
|
|
||||||
|
// create two RS nodes
|
||||||
|
String rsPath = ZKUtil.joinZNode(zkw.rsZNode, RS.getServerName());
|
||||||
|
zkw.getRecoverableZooKeeper().create(rsPath, null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
|
||||||
|
rsPath = ZKUtil.joinZNode(zkw.rsZNode, RS2.getServerName());
|
||||||
|
zkw.getRecoverableZooKeeper().create(rsPath, null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
|
||||||
|
|
||||||
|
for (int i = 0; i < maxTasks; i++) {
|
||||||
|
zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TATAS + i),
|
||||||
|
new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1")).toByteArray(),
|
||||||
|
Ids.OPEN_ACL_UNSAFE,
|
||||||
|
CreateMode.PERSISTENT);
|
||||||
|
}
|
||||||
|
|
||||||
|
SplitLogWorker slw = new SplitLogWorker(zkw, testConf, mockedRS, neverEndingTask);
|
||||||
|
slw.start();
|
||||||
|
try {
|
||||||
|
int acquiredTasks = 0;
|
||||||
|
waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 2, 3000);
|
||||||
|
for (int i = 0; i < maxTasks; i++) {
|
||||||
|
byte[] bytes = ZKUtil.getData(zkw, ZKSplitLog.getEncodedNodeName(zkw, TATAS + i));
|
||||||
|
SplitLogTask slt = SplitLogTask.parseFrom(bytes);
|
||||||
|
if (slt.isOwned(RS)) {
|
||||||
|
acquiredTasks++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assertEquals(2, acquiredTasks);
|
||||||
|
} finally {
|
||||||
|
stopSplitLogWorker(slw);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a mocked region server service instance
|
||||||
|
* @param server
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
private RegionServerServices getRegionServer(ServerName name) {
|
||||||
|
|
||||||
|
RegionServerServices mockedServer = mock(RegionServerServices.class);
|
||||||
|
when(mockedServer.getConfiguration()).thenReturn(TEST_UTIL.getConfiguration());
|
||||||
|
when(mockedServer.getServerName()).thenReturn(name);
|
||||||
|
when(mockedServer.getZooKeeper()).thenReturn(zkw);
|
||||||
|
when(mockedServer.isStopped()).thenReturn(false);
|
||||||
|
when(mockedServer.getExecutorService()).thenReturn(executorService);
|
||||||
|
|
||||||
|
return mockedServer;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue