HBASE-4820. Distributed log splitting coding enhancement to make it easier to understand, no semantics change. Contributed by Jimmy Xiang.

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1208801 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2011-11-30 21:28:36 +00:00
parent b290b43264
commit 54e463f519
7 changed files with 57 additions and 40 deletions

View File

@ -1546,8 +1546,8 @@ public class AssignmentManager extends ZooKeeperListener {
// In case of reassignment the current state in memory need not be // In case of reassignment the current state in memory need not be
// OFFLINE. // OFFLINE.
if (!hijack && !state.isClosed() && !state.isOffline()) { if (!hijack && !state.isClosed() && !state.isOffline()) {
this.master.abort("Unexpected state trying to OFFLINE; " + state, String msg = "Unexpected state : " + state + " .. Cannot transit it to OFFLINE.";
new IllegalStateException()); this.master.abort(msg, new IllegalStateException(msg));
return -1; return -1;
} }
boolean allowZNodeCreation = false; boolean allowZNodeCreation = false;

View File

@ -268,9 +268,7 @@ public class MasterFileSystem {
} }
if (distributedLogSplitting) { if (distributedLogSplitting) {
for (ServerName serverName : serverNames) { splitLogManager.handleDeadWorkers(serverNames);
splitLogManager.handleDeadWorker(serverName.toString());
}
splitTime = EnvironmentEdgeManager.currentTimeMillis(); splitTime = EnvironmentEdgeManager.currentTimeMillis();
try { try {
splitLogSize = splitLogManager.splitLogDistributed(logDirs); splitLogSize = splitLogManager.splitLogDistributed(logDirs);

View File

@ -37,6 +37,7 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Chore; import org.apache.hadoop.hbase.Chore;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.master.SplitLogManager.TaskFinisher.Status; import org.apache.hadoop.hbase.master.SplitLogManager.TaskFinisher.Status;
import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.MonitoredTask;
@ -243,12 +244,12 @@ public class SplitLogManager extends ZooKeeperListener {
// recover-lease is done. totalSize will be under in most cases and the // recover-lease is done. totalSize will be under in most cases and the
// metrics that it drives will also be under-reported. // metrics that it drives will also be under-reported.
totalSize += lf.getLen(); totalSize += lf.getLen();
if (installTask(lf.getPath().toString(), batch) == false) { if (enqueueSplitTask(lf.getPath().toString(), batch) == false) {
throw new IOException("duplicate log split scheduled for " throw new IOException("duplicate log split scheduled for "
+ lf.getPath()); + lf.getPath());
} }
} }
waitTasks(batch, status); waitForSplittingCompletion(batch, status);
if (batch.done != batch.installed) { if (batch.done != batch.installed) {
stopTrackingTasks(batch); stopTrackingTasks(batch);
tot_mgr_log_split_batch_err.incrementAndGet(); tot_mgr_log_split_batch_err.incrementAndGet();
@ -278,7 +279,14 @@ public class SplitLogManager extends ZooKeeperListener {
return totalSize; return totalSize;
} }
boolean installTask(String taskname, TaskBatch batch) { /**
* Add a task entry to splitlog znode if it is not already there.
*
* @param taskname the path of the log to be split
* @param batch the batch this task belongs to
* @return true if a new entry is created, false if it is already there.
*/
boolean enqueueSplitTask(String taskname, TaskBatch batch) {
tot_mgr_log_split_start.incrementAndGet(); tot_mgr_log_split_start.incrementAndGet();
String path = ZKSplitLog.getEncodedNodeName(watcher, taskname); String path = ZKSplitLog.getEncodedNodeName(watcher, taskname);
Task oldtask = createTaskIfAbsent(path, batch); Task oldtask = createTaskIfAbsent(path, batch);
@ -292,7 +300,7 @@ public class SplitLogManager extends ZooKeeperListener {
return false; return false;
} }
private void waitTasks(TaskBatch batch, MonitoredTask status) { private void waitForSplittingCompletion(TaskBatch batch, MonitoredTask status) {
synchronized (batch) { synchronized (batch) {
while ((batch.done + batch.error) != batch.installed) { while ((batch.done + batch.error) != batch.installed) {
try { try {
@ -371,7 +379,7 @@ public class SplitLogManager extends ZooKeeperListener {
} }
private void createNodeFailure(String path) { private void createNodeFailure(String path) {
// TODO the Manger should split the log locally instead of giving up // TODO the Manager should split the log locally instead of giving up
LOG.warn("failed to create task node" + path); LOG.warn("failed to create task node" + path);
setDone(path, FAILURE); setDone(path, FAILURE);
} }
@ -767,16 +775,30 @@ public class SplitLogManager extends ZooKeeperListener {
} }
} }
void handleDeadWorker(String worker_name) { void handleDeadWorker(String workerName) {
// resubmit the tasks on the TimeoutMonitor thread. Makes it easier // resubmit the tasks on the TimeoutMonitor thread. Makes it easier
// to reason about concurrency. Makes it easier to retry. // to reason about concurrency. Makes it easier to retry.
synchronized (deadWorkersLock) { synchronized (deadWorkersLock) {
if (deadWorkers == null) { if (deadWorkers == null) {
deadWorkers = new HashSet<String>(100); deadWorkers = new HashSet<String>(100);
} }
deadWorkers.add(worker_name); deadWorkers.add(workerName);
} }
LOG.info("dead splitlog worker " + worker_name); LOG.info("dead splitlog worker " + workerName);
}
void handleDeadWorkers(List<ServerName> serverNames) {
List<String> workerNames = new ArrayList<String>(serverNames.size());
for (ServerName serverName : serverNames) {
workerNames.add(serverName.toString());
}
synchronized (deadWorkersLock) {
if (deadWorkers == null) {
deadWorkers = new HashSet<String>(100);
}
deadWorkers.addAll(workerNames);
}
LOG.info("dead splitlog workers " + workerNames);
} }
/** /**
@ -871,7 +893,7 @@ public class SplitLogManager extends ZooKeeperListener {
} else { } else {
Long retry_count = (Long)ctx; Long retry_count = (Long)ctx;
LOG.warn("create rc =" + KeeperException.Code.get(rc) + " for " + LOG.warn("create rc =" + KeeperException.Code.get(rc) + " for " +
path + " retry=" + retry_count); path + " remaining retries=" + retry_count);
if (retry_count == 0) { if (retry_count == 0) {
tot_mgr_node_create_err.incrementAndGet(); tot_mgr_node_create_err.incrementAndGet();
createNodeFailure(path); createNodeFailure(path);
@ -900,7 +922,7 @@ public class SplitLogManager extends ZooKeeperListener {
if (rc != 0) { if (rc != 0) {
Long retry_count = (Long) ctx; Long retry_count = (Long) ctx;
LOG.warn("getdata rc = " + KeeperException.Code.get(rc) + " " + LOG.warn("getdata rc = " + KeeperException.Code.get(rc) + " " +
path + " retry=" + retry_count); path + " remaining retries=" + retry_count);
if (retry_count == 0) { if (retry_count == 0) {
tot_mgr_get_data_err.incrementAndGet(); tot_mgr_get_data_err.incrementAndGet();
getDataSetWatchFailure(path); getDataSetWatchFailure(path);
@ -930,7 +952,7 @@ public class SplitLogManager extends ZooKeeperListener {
tot_mgr_node_delete_err.incrementAndGet(); tot_mgr_node_delete_err.incrementAndGet();
Long retry_count = (Long) ctx; Long retry_count = (Long) ctx;
LOG.warn("delete rc=" + KeeperException.Code.get(rc) + " for " + LOG.warn("delete rc=" + KeeperException.Code.get(rc) + " for " +
path + " retry=" + retry_count); path + " remaining retries=" + retry_count);
if (retry_count == 0) { if (retry_count == 0) {
LOG.warn("delete failed " + path); LOG.warn("delete failed " + path);
deleteNodeFailure(path); deleteNodeFailure(path);
@ -965,7 +987,7 @@ public class SplitLogManager extends ZooKeeperListener {
if (rc != 0) { if (rc != 0) {
Long retry_count = (Long)ctx; Long retry_count = (Long)ctx;
LOG.warn("rc=" + KeeperException.Code.get(rc) + " for "+ path + LOG.warn("rc=" + KeeperException.Code.get(rc) + " for "+ path +
" retry=" + retry_count); " remaining retries=" + retry_count);
if (retry_count == 0) { if (retry_count == 0) {
createRescanFailure(); createRescanFailure();
} else { } else {

View File

@ -54,8 +54,8 @@ import org.apache.zookeeper.data.Stat;
* <p> * <p>
* If a worker has successfully moved the task from state UNASSIGNED to * If a worker has successfully moved the task from state UNASSIGNED to
* OWNED then it owns the task. It keeps heart beating the manager by * OWNED then it owns the task. It keeps heart beating the manager by
* periodically moving the task from OWNED to OWNED state. On success it * periodically moving the task from UNASSIGNED to OWNED state. On success it
* moves the task to SUCCESS. On unrecoverable error it moves task state to * moves the task to TASK_DONE. On unrecoverable error it moves task state to
* ERR. If it cannot continue but wants the master to retry the task then it * ERR. If it cannot continue but wants the master to retry the task then it
* moves the task state to RESIGNED. * moves the task state to RESIGNED.
* <p> * <p>
@ -70,7 +70,7 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
Thread worker; Thread worker;
private final String serverName; private final String serverName;
private final TaskExecutor executor; private final TaskExecutor splitTaskExecutor;
private long zkretries; private long zkretries;
private Object taskReadyLock = new Object(); private Object taskReadyLock = new Object();
@ -83,10 +83,10 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
public SplitLogWorker(ZooKeeperWatcher watcher, Configuration conf, public SplitLogWorker(ZooKeeperWatcher watcher, Configuration conf,
String serverName, TaskExecutor executor) { String serverName, TaskExecutor splitTaskExecutor) {
super(watcher); super(watcher);
this.serverName = serverName; this.serverName = serverName;
this.executor = executor; this.splitTaskExecutor = splitTaskExecutor;
this.zkretries = conf.getLong("hbase.splitlog.zk.retries", 3); this.zkretries = conf.getLong("hbase.splitlog.zk.retries", 3);
} }
@ -247,7 +247,7 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
} }
currentVersion = stat.getVersion(); currentVersion = stat.getVersion();
if (ownTask(true) == false) { if (attemptToOwnTask(true) == false) {
tot_wkr_failed_to_grab_task_lost_race.incrementAndGet(); tot_wkr_failed_to_grab_task_lost_race.incrementAndGet();
return; return;
} }
@ -263,12 +263,12 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
t = System.currentTimeMillis(); t = System.currentTimeMillis();
TaskExecutor.Status status; TaskExecutor.Status status;
status = executor.exec(ZKSplitLog.getFileName(currentTask), status = splitTaskExecutor.exec(ZKSplitLog.getFileName(currentTask),
new CancelableProgressable() { new CancelableProgressable() {
@Override @Override
public boolean progress() { public boolean progress() {
if (ownTask(false) == false) { if (attemptToOwnTask(false) == false) {
LOG.warn("Failed to heartbeat the task" + currentTask); LOG.warn("Failed to heartbeat the task" + currentTask);
return false; return false;
} }
@ -327,7 +327,7 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
* <p> * <p>
* @return true if task path is successfully locked * @return true if task path is successfully locked
*/ */
private boolean ownTask(boolean isFirstTime) { private boolean attemptToOwnTask(boolean isFirstTime) {
try { try {
Stat stat = this.watcher.getRecoverableZooKeeper().setData(currentTask, Stat stat = this.watcher.getRecoverableZooKeeper().setData(currentTask,
TaskState.TASK_OWNED.get(serverName), currentVersion); TaskState.TASK_OWNED.get(serverName), currentVersion);
@ -405,7 +405,7 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
String taskpath = currentTask; String taskpath = currentTask;
if (taskpath != null && taskpath.equals(path)) { if (taskpath != null && taskpath.equals(path)) {
// have to compare data. cannot compare version because then there // have to compare data. cannot compare version because then there
// will be race with ownTask() // will be race with attemptToOwnTask()
// cannot just check whether the node has been transitioned to // cannot just check whether the node has been transitioned to
// UNASSIGNED because by the time this worker sets the data watch // UNASSIGNED because by the time this worker sets the data watch
// the node might have made two transitions - from owned by this // the node might have made two transitions - from owned by this
@ -446,7 +446,7 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
@Override @Override
public void nodeDataChanged(String path) { public void nodeDataChanged(String path) {
// there will be a self generated dataChanged event every time ownTask() // there will be a self generated dataChanged event every time attemptToOwnTask()
// heartbeats the task znode by upping its version // heartbeats the task znode by upping its version
synchronized (grabTaskLock) { synchronized (grabTaskLock) {
if (workerInGrabTask) { if (workerInGrabTask) {

View File

@ -671,14 +671,15 @@ public class HLogSplitter {
return String.format("%019d", seqid); return String.format("%019d", seqid);
} }
/* /**
* Parse a single hlog and put the edits in @splitLogsMap * Parse a single hlog and put the edits in entryBuffers
* *
* @param logfile to split * @param in the hlog reader
* @param splitLogsMap output parameter: a map with region names as keys and a * @param path the path of the log file
* list of edits as values * @param entryBuffers the buffer to hold the parsed edits
* @param fs the filesystem * @param fs the file system
* @param conf the configuration * @param conf the configuration
* @param skipErrors indicator if CorruptedLogFileException should be thrown instead of IOException
* @throws IOException * @throws IOException
* @throws CorruptedLogFileException if hlog is corrupted * @throws CorruptedLogFileException if hlog is corrupted
*/ */

View File

@ -275,7 +275,7 @@ public class TestDistributedLogSplitting {
// slm.splitLogDistributed(logDir); // slm.splitLogDistributed(logDir);
FileStatus[] logfiles = fs.listStatus(logDir); FileStatus[] logfiles = fs.listStatus(logDir);
TaskBatch batch = new TaskBatch(); TaskBatch batch = new TaskBatch();
slm.installTask(logfiles[0].getPath().toString(), batch); slm.enqueueSplitTask(logfiles[0].getPath().toString(), batch);
//waitForCounter but for one of the 2 counters //waitForCounter but for one of the 2 counters
long curt = System.currentTimeMillis(); long curt = System.currentTimeMillis();
long endt = curt + 30000; long endt = curt + 30000;
@ -372,13 +372,9 @@ public class TestDistributedLogSplitting {
byte [] qualifier = Bytes.toBytes("c" + Integer.toString(i)); byte [] qualifier = Bytes.toBytes("c" + Integer.toString(i));
e.add(new KeyValue(row, family, qualifier, e.add(new KeyValue(row, family, qualifier,
System.currentTimeMillis(), value)); System.currentTimeMillis(), value));
// LOG.info("Region " + i + ": " + e);
j++; j++;
log.append(hris.get(j % n), table, e, System.currentTimeMillis(), htd); log.append(hris.get(j % n), table, e, System.currentTimeMillis(), htd);
counts[j % n] += 1; counts[j % n] += 1;
// if ((i % 8096) == 0) {
// log.sync();
// }
} }
} }
log.sync(); log.sync();

View File

@ -138,7 +138,7 @@ public class TestSplitLogManager {
zkw.registerListener(listener); zkw.registerListener(listener);
ZKUtil.watchAndCheckExists(zkw, tasknode); ZKUtil.watchAndCheckExists(zkw, tasknode);
slm.installTask(name, batch); slm.enqueueSplitTask(name, batch);
assertEquals(1, batch.installed); assertEquals(1, batch.installed);
assertTrue(slm.findOrCreateOrphanTask(tasknode).batch == batch); assertTrue(slm.findOrCreateOrphanTask(tasknode).batch == batch);
assertEquals(1L, tot_mgr_node_create_queued.get()); assertEquals(1L, tot_mgr_node_create_queued.get());