diff --git a/CHANGES.txt b/CHANGES.txt index 1d7238e01cf..31b8065529e 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -470,6 +470,7 @@ Release 0.92.0 - Unreleased are shutdown at the same time (Ming Ma) HBASE-5094 The META can hold an entry for a region with a different server name from the one actually in the AssignmentManager thus making the region inaccessible. (Ram) + HBASE-5081 Distributed log splitting deleteNode races against splitLog retry (Prakash) TESTS HBASE-4450 test for number of blocks read: to serve as baseline for expected diff --git a/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java b/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java index 3938fa74f7a..548a5be9668 100644 --- a/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java +++ b/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java @@ -270,12 +270,7 @@ public class MasterFileSystem { if (distributedLogSplitting) { splitLogManager.handleDeadWorkers(serverNames); splitTime = EnvironmentEdgeManager.currentTimeMillis(); - try { - splitLogSize = splitLogManager.splitLogDistributed(logDirs); - } catch (OrphanHLogAfterSplitException e) { - LOG.warn("Retrying distributed splitting for " + serverNames, e); - splitLogManager.splitLogDistributed(logDirs); - } + splitLogSize = splitLogManager.splitLogDistributed(logDirs); splitTime = EnvironmentEdgeManager.currentTimeMillis() - splitTime; } else { for(Path logDir: logDirs){ diff --git a/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java b/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java index 667a8b18e7f..0ef0e336846 100644 --- a/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java +++ b/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java @@ -107,6 +107,7 @@ public class SplitLogManager extends ZooKeeperListener { private long timeout; private long unassignedTimeout; private long lastNodeCreateTime = Long.MAX_VALUE; + public boolean ignoreZKDeleteForTesting = false; private ConcurrentMap tasks = new ConcurrentHashMap(); @@ -116,10 +117,12 @@ public class SplitLogManager extends ZooKeeperListener { private Object deadWorkersLock = new Object(); /** - * Its OK to construct this object even when region-servers are not online. It - * does lookup the orphan tasks in zk but it doesn't block for them to be - * done. - * + * Wrapper around {@link #SplitLogManager(ZooKeeperWatcher, Configuration, + * Stoppable, String, TaskFinisher)} that provides a task finisher for + * copying recovered edits to their final destination. The task finisher + * has to be robust because it can be arbitrarily restarted or called + * multiple times. + * * @param zkw * @param conf * @param stopper @@ -142,6 +145,18 @@ public class SplitLogManager extends ZooKeeperListener { } }); } + + /** + * Its OK to construct this object even when region-servers are not online. It + * does lookup the orphan tasks in zk but it doesn't block waiting for them + * to be done. + * + * @param zkw + * @param conf + * @param stopper + * @param serverName + * @param tf task finisher + */ public SplitLogManager(ZooKeeperWatcher zkw, Configuration conf, Stoppable stopper, String serverName, TaskFinisher tf) { super(zkw); @@ -194,8 +209,6 @@ public class SplitLogManager extends ZooKeeperListener { fileStatus.add(status); } } - if (fileStatus.isEmpty()) - return null; FileStatus[] a = new FileStatus[fileStatus.size()]; return fileStatus.toArray(a); } @@ -228,8 +241,6 @@ public class SplitLogManager extends ZooKeeperListener { MonitoredTask status = TaskMonitor.get().createStatus( "Doing distributed log split in " + logDirs); FileStatus[] logfiles = getFileList(logDirs); - if(logfiles == null) - return 0; status.setStatus("Checking directory contents..."); LOG.debug("Scheduling batch of logs to split"); tot_mgr_log_split_batch_start.incrementAndGet(); @@ -251,7 +262,7 @@ public class SplitLogManager extends ZooKeeperListener { } waitForSplittingCompletion(batch, status); if (batch.done != batch.installed) { - stopTrackingTasks(batch); + batch.isDead = true; tot_mgr_log_split_batch_err.incrementAndGet(); LOG.warn("error while splitting logs in " + logDirs + " installed = " + batch.installed + " but only " + batch.done + " done"); @@ -259,17 +270,21 @@ public class SplitLogManager extends ZooKeeperListener { + logDirs + " Task = " + batch); } for(Path logDir: logDirs){ - if (anyNewLogFiles(logDir, logfiles)) { - tot_mgr_new_unexpected_hlogs.incrementAndGet(); - LOG.warn("new hlogs were produced while logs in " + logDir + - " were being split"); - throw new OrphanHLogAfterSplitException(); + status.setStatus("Cleaning up log directory..."); + try { + if (fs.exists(logDir) && !fs.delete(logDir, false)) { + LOG.warn("Unable to delete log src dir. Ignoring. " + logDir); + } + } catch (IOException ioe) { + FileStatus[] files = fs.listStatus(logDir); + if (files != null && files.length > 0) { + LOG.warn("returning success without actually splitting and " + + "deleting all the log files in path " + logDir); + } else { + LOG.warn("Unable to delete log src dir. Ignoring. " + logDir, ioe); + } } tot_mgr_log_split_batch_success.incrementAndGet(); - status.setStatus("Cleaning up log directory..."); - if (!fs.delete(logDir, true)) { - throw new IOException("Unable to delete src dir: " + logDir); - } } String msg = "finished splitting (more than or equal to) " + totalSize + " bytes in " + batch.installed + " log files in " + logDirs + " in " + @@ -295,8 +310,6 @@ public class SplitLogManager extends ZooKeeperListener { createNode(path, zkretries); return true; } - LOG.warn(path + "is already being split. " + - "Two threads cannot wait for the same task"); return false; } @@ -323,15 +336,6 @@ public class SplitLogManager extends ZooKeeperListener { } private void setDone(String path, TerminationStatus status) { - if (!ZKSplitLog.isRescanNode(watcher, path)) { - if (status == SUCCESS) { - tot_mgr_log_split_success.incrementAndGet(); - LOG.info("Done splitting " + path); - } else { - tot_mgr_log_split_err.incrementAndGet(); - LOG.warn("Error splitting " + path); - } - } Task task = tasks.get(path); if (task == null) { if (!ZKSplitLog.isRescanNode(watcher, path)) { @@ -340,18 +344,24 @@ public class SplitLogManager extends ZooKeeperListener { } } else { synchronized (task) { - task.deleted = true; - // if in stopTrackingTasks() we were to make tasks orphan instead of - // forgetting about them then we will have to handle the race when - // accessing task.batch here. - if (!task.isOrphan()) { - synchronized (task.batch) { - if (status == SUCCESS) { - task.batch.done++; - } else { - task.batch.error++; + if (task.status == IN_PROGRESS) { + if (status == SUCCESS) { + tot_mgr_log_split_success.incrementAndGet(); + LOG.info("Done splitting " + path); + } else { + tot_mgr_log_split_err.incrementAndGet(); + LOG.warn("Error splitting " + path); + } + task.status = status; + if (task.batch != null) { + synchronized (task.batch) { + if (status == SUCCESS) { + task.batch.done++; + } else { + task.batch.error++; + } + task.batch.notify(); } - task.batch.notify(); } } } @@ -394,6 +404,11 @@ public class SplitLogManager extends ZooKeeperListener { private void getDataSetWatchSuccess(String path, byte[] data, int version) { if (data == null) { + if (version == Integer.MIN_VALUE) { + // assume all done. The task znode suddenly disappeared. + setDone(path, SUCCESS); + return; + } tot_mgr_null_data.incrementAndGet(); LOG.fatal("logic error - got null data " + path); setDone(path, FAILURE); @@ -480,7 +495,7 @@ public class SplitLogManager extends ZooKeeperListener { ResubmitDirective directive) { // its ok if this thread misses the update to task.deleted. It will // fail later - if (task.deleted) { + if (task.status != IN_PROGRESS) { return false; } int version; @@ -490,7 +505,8 @@ public class SplitLogManager extends ZooKeeperListener { return false; } if (task.unforcedResubmits >= resubmit_threshold) { - if (task.unforcedResubmits == resubmit_threshold) { + if (!task.resubmitThresholdReached) { + task.resubmitThresholdReached = true; tot_mgr_resubmit_threshold_reached.incrementAndGet(); LOG.info("Skipping resubmissions of task " + path + " because threshold " + resubmit_threshold + " reached"); @@ -514,7 +530,9 @@ public class SplitLogManager extends ZooKeeperListener { return false; } } catch (NoNodeException e) { - LOG.debug("failed to resubmit " + path + " task done"); + LOG.warn("failed to resubmit because znode doesn't exist " + path + + " task done (or forced done by removing the znode)"); + getDataSetWatchSuccess(path, null, Integer.MIN_VALUE); return false; } catch (KeeperException e) { tot_mgr_resubmit_failed.incrementAndGet(); @@ -539,12 +557,18 @@ public class SplitLogManager extends ZooKeeperListener { private void deleteNode(String path, Long retries) { tot_mgr_node_delete_queued.incrementAndGet(); + // Once a task znode is ready for delete, that is it is in the TASK_DONE + // state, then no one should be writing to it anymore. That is no one + // will be updating the znode version any more. this.watcher.getRecoverableZooKeeper().getZooKeeper(). delete(path, -1, new DeleteAsyncCallback(), retries); } private void deleteNodeSuccess(String path) { + if (ignoreZKDeleteForTesting) { + return; + } Task task; task = tasks.remove(path); if (task == null) { @@ -555,6 +579,10 @@ public class SplitLogManager extends ZooKeeperListener { LOG.debug("deleted task without in memory state " + path); return; } + synchronized (task) { + task.status = DELETED; + task.notify(); + } tot_mgr_task_deleted.incrementAndGet(); } @@ -603,59 +631,67 @@ public class SplitLogManager extends ZooKeeperListener { Task oldtask; // batch.installed is only changed via this function and // a single thread touches batch.installed. - oldtask = tasks.putIfAbsent(path, new Task(batch)); - if (oldtask != null) { - // new task was not used. - batch.installed--; - synchronized (oldtask) { - if (oldtask.isOrphan()) { - if (oldtask.deleted) { - // The task is already done. Do not install the batch for this - // task because it might be too late for setDone() to update - // batch.done. There is no need for the batch creator to wait for - // this task to complete. - return (null); - } - oldtask.setBatch(batch); - } - } - LOG.info("Previously orphan task " + path + - " is now being waited upon"); - return (null); + Task newtask = new Task(); + newtask.batch = batch; + oldtask = tasks.putIfAbsent(path, newtask); + if (oldtask == null) { + batch.installed++; + return null; } - return oldtask; - } - - /** - * This function removes any knowledge of this batch's tasks from the - * manager. It doesn't actually stop the active tasks. If the tasks are - * resubmitted then the active tasks will be reacquired and monitored by the - * manager. It is important to call this function when batch processing - * terminates prematurely, otherwise if the tasks are re-submitted - * then they might fail. - *

- * there is a slight race here. even after a task has been removed from - * {@link #tasks} someone who had acquired a reference to it will continue to - * process the task. That is OK since we don't actually change the task and - * the batch objects. - *

- * TODO Its probably better to convert these to orphan tasks but then we - * have to deal with race conditions as we nullify Task's batch pointer etc. - *

- * @param batch - */ - void stopTrackingTasks(TaskBatch batch) { - for (Map.Entry e : tasks.entrySet()) { - String path = e.getKey(); - Task t = e.getValue(); - if (t.batch == batch) { // == is correct. equals not necessary. - tasks.remove(path); + // new task was not used. + synchronized (oldtask) { + if (oldtask.isOrphan()) { + if (oldtask.status == SUCCESS) { + // The task is already done. Do not install the batch for this + // task because it might be too late for setDone() to update + // batch.done. There is no need for the batch creator to wait for + // this task to complete. + return (null); + } + if (oldtask.status == IN_PROGRESS) { + oldtask.batch = batch; + batch.installed++; + LOG.debug("Previously orphan task " + path + + " is now being waited upon"); + return null; + } + while (oldtask.status == FAILURE) { + LOG.debug("wait for status of task " + path + + " to change to DELETED"); + tot_mgr_wait_for_zk_delete.incrementAndGet(); + try { + oldtask.wait(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.warn("Interrupted when waiting for znode delete callback"); + // fall through to return failure + break; + } + } + if (oldtask.status != DELETED) { + LOG.warn("Failure because previously failed task" + + " state still present. Waiting for znode delete callback" + + " path=" + path); + return oldtask; + } + // reinsert the newTask and it must succeed this time + Task t = tasks.putIfAbsent(path, newtask); + if (t == null) { + batch.installed++; + return null; + } + LOG.fatal("Logic error. Deleted task still present in tasks map"); + assert false : "Deleted task still present in tasks map"; + return t; } + LOG.warn("Failure because two threads can't wait for the same task. " + + " path=" + path); + return oldtask; } } Task findOrCreateOrphanTask(String path) { - Task orphanTask = new Task(null); + Task orphanTask = new Task(); Task task; task = tasks.putIfAbsent(path, orphanTask); if (task == null) { @@ -716,9 +752,10 @@ public class SplitLogManager extends ZooKeeperListener { * All access is synchronized. */ static class TaskBatch { - int installed; - int done; - int error; + int installed = 0; + int done = 0; + int error = 0; + volatile boolean isDead = false; @Override public String toString() { @@ -731,45 +768,35 @@ public class SplitLogManager extends ZooKeeperListener { * in memory state of an active task. */ static class Task { - long last_update; - int last_version; - String cur_worker_name; + volatile long last_update; + volatile int last_version; + volatile String cur_worker_name; TaskBatch batch; - boolean deleted; - int incarnation; - int unforcedResubmits; + volatile TerminationStatus status; + volatile int incarnation; + volatile int unforcedResubmits; + volatile boolean resubmitThresholdReached; @Override public String toString() { return ("last_update = " + last_update + " last_version = " + last_version + " cur_worker_name = " + cur_worker_name + - " deleted = " + deleted + + " status = " + status + " incarnation = " + incarnation + " resubmits = " + unforcedResubmits + " batch = " + batch); } - Task(TaskBatch tb) { + Task() { incarnation = 0; last_version = -1; - deleted = false; - setBatch(tb); + status = IN_PROGRESS; setUnassigned(); } - public void setBatch(TaskBatch batch) { - if (batch != null && this.batch != null) { - LOG.fatal("logic error - batch being overwritten"); - } - this.batch = batch; - if (batch != null) { - batch.installed++; - } - } - public boolean isOrphan() { - return (batch == null); + return (batch == null || batch.isDead); } public boolean isUnassigned() { @@ -882,6 +909,16 @@ public class SplitLogManager extends ZooKeeperListener { if (tot > 0 && !found_assigned_task && ((EnvironmentEdgeManager.currentTimeMillis() - lastNodeCreateTime) > unassignedTimeout)) { + for (Map.Entry e : tasks.entrySet()) { + String path = e.getKey(); + Task task = e.getValue(); + // we have to do this check again because tasks might have + // been asynchronously assigned. + if (task.isUnassigned()) { + // We just touch the znode to make sure its still there + getDataSetWatch(path, zkretries); + } + } createRescanNode(Long.MAX_VALUE); tot_mgr_resubmit_unassigned.incrementAndGet(); LOG.debug("resubmitting unassigned task(s) after timeout"); @@ -901,6 +938,12 @@ public class SplitLogManager extends ZooKeeperListener { tot_mgr_node_create_result.incrementAndGet(); if (rc != 0) { if (rc == KeeperException.Code.NODEEXISTS.intValue()) { + // What if there is a delete pending against this pre-existing + // znode? Then this soon-to-be-deleted task znode must be in TASK_DONE + // state. Only operations that will be carried out on this node by + // this manager are get-znode-data, task-finisher and delete-znode. + // And all code pieces correctly handle the case of suddenly + // disappearing task-znode. LOG.debug("found pre-existing znode " + path); tot_mgr_node_already_exists.incrementAndGet(); } else { @@ -933,6 +976,15 @@ public class SplitLogManager extends ZooKeeperListener { Stat stat) { tot_mgr_get_data_result.incrementAndGet(); if (rc != 0) { + if (rc == KeeperException.Code.NONODE.intValue()) { + tot_mgr_get_data_nonode.incrementAndGet(); + // The task znode has been deleted. Must be some pending delete + // that deleted the task. Assume success because a task-znode is + // is only deleted after TaskFinisher is successful. + LOG.warn("task znode " + path + " vanished."); + getDataSetWatchSuccess(path, null, Integer.MIN_VALUE); + return; + } Long retry_count = (Long) ctx; LOG.warn("getdata rc = " + KeeperException.Code.get(rc) + " " + path + " remaining retries=" + retry_count); @@ -974,9 +1026,10 @@ public class SplitLogManager extends ZooKeeperListener { } return; } else { - LOG.debug(path - + " does not exist, either was never created or was deleted" - + " in earlier rounds, zkretries = " + (Long) ctx); + LOG.debug(path + + " does not exist. Either was created but deleted behind our" + + " back by another pending delete OR was deleted" + + " in earlier retry rounds. zkretries = " + (Long) ctx); } } else { LOG.debug("deleted " + path); @@ -1013,47 +1066,11 @@ public class SplitLogManager extends ZooKeeperListener { } } - /** - * checks whether any new files have appeared in logDir which were - * not present in the original logfiles set - * @param logdir - * @param logfiles - * @return True if a new log file is found - * @throws IOException - */ - public boolean anyNewLogFiles(Path logdir, FileStatus[] logfiles) - throws IOException { - if (logdir == null) { - return false; - } - LOG.debug("re-listing " + logdir); - tot_mgr_relist_logdir.incrementAndGet(); - FileStatus[] newfiles = FSUtils.listStatus(fs, logdir, null); - if (newfiles == null) { - return false; - } - boolean matched; - for (FileStatus newfile : newfiles) { - matched = false; - for (FileStatus origfile : logfiles) { - if (origfile.equals(newfile)) { - matched = true; - break; - } - } - if (matched == false) { - LOG.warn("Discovered orphan hlog " + newfile + " after split." + - " Maybe HRegionServer was not dead when we started"); - return true; - } - } - return false; - } - /** * {@link SplitLogManager} can use objects implementing this interface to * finish off a partially done task by {@link SplitLogWorker}. This provides - * a serialization point at the end of the task processing. + * a serialization point at the end of the task processing. Must be + * restartable and idempotent. */ static public interface TaskFinisher { /** @@ -1085,7 +1102,19 @@ public class SplitLogManager extends ZooKeeperListener { FORCE(); } enum TerminationStatus { - SUCCESS(), - FAILURE(); + IN_PROGRESS("in_progress"), + SUCCESS("success"), + FAILURE("failure"), + DELETED("deleted"); + + String statusMsg; + TerminationStatus(String msg) { + statusMsg = msg; + } + + @Override + public String toString() { + return statusMsg; + } } } diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java index 43bfba0af93..6f9417afa18 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java @@ -541,16 +541,21 @@ public class HLogSplitter { if (ZKSplitLog.isCorruptFlagFile(dst)) { continue; } - if (fs.exists(dst)) { - fs.delete(dst, false); - } else { - Path dstdir = dst.getParent(); - if (!fs.exists(dstdir)) { - if (!fs.mkdirs(dstdir)) LOG.warn("mkdir failed on " + dstdir); + if (fs.exists(src)) { + if (fs.exists(dst)) { + fs.delete(dst, false); + } else { + Path dstdir = dst.getParent(); + if (!fs.exists(dstdir)) { + if (!fs.mkdirs(dstdir)) LOG.warn("mkdir failed on " + dstdir); + } } + fs.rename(src, dst); + LOG.debug(" moved " + src + " => " + dst); + } else { + LOG.debug("Could not move recovered edits from " + src + + " as it doesn't exist"); } - fs.rename(src, dst); - LOG.debug(" moved " + src + " => " + dst); } archiveLogs(null, corruptedLogs, processedLogs, oldLogDir, fs, conf); @@ -600,24 +605,32 @@ public class HLogSplitter { } fs.mkdirs(oldLogDir); + // this method can get restarted or called multiple times for archiving + // the same log files. for (Path corrupted : corruptedLogs) { Path p = new Path(corruptDir, corrupted.getName()); - if (!fs.rename(corrupted, p)) { - LOG.info("Unable to move corrupted log " + corrupted + " to " + p); - } else { - LOG.info("Moving corrupted log " + corrupted + " to " + p); + if (fs.exists(corrupted)) { + if (!fs.rename(corrupted, p)) { + LOG.warn("Unable to move corrupted log " + corrupted + " to " + p); + } else { + LOG.warn("Moving corrupted log " + corrupted + " to " + p); + } } } for (Path p : processedLogs) { Path newPath = HLog.getHLogArchivePath(oldLogDir, p); - if (!fs.rename(p, newPath)) { - LOG.info("Unable to move " + p + " to " + newPath); - } else { - LOG.info("Archived processed log " + p + " to " + newPath); + if (fs.exists(p)) { + if (!fs.rename(p, newPath)) { + LOG.warn("Unable to move " + p + " to " + newPath); + } else { + LOG.debug("Archived processed log " + p + " to " + newPath); + } } } + // distributed log splitting removes the srcDir (region's log dir) later + // when all the log files in that srcDir have been successfully processed if (srcDir != null && !fs.delete(srcDir, true)) { throw new IOException("Unable to delete src dir: " + srcDir); } diff --git a/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java b/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java index 9b83840652c..02034dc8672 100644 --- a/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java +++ b/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java @@ -215,6 +215,7 @@ public class ZKSplitLog { public static AtomicLong tot_mgr_node_create_retry = new AtomicLong(0); public static AtomicLong tot_mgr_get_data_queued = new AtomicLong(0); public static AtomicLong tot_mgr_get_data_result = new AtomicLong(0); + public static AtomicLong tot_mgr_get_data_nonode = new AtomicLong(0); public static AtomicLong tot_mgr_get_data_err = new AtomicLong(0); public static AtomicLong tot_mgr_get_data_retry = new AtomicLong(0); public static AtomicLong tot_mgr_node_delete_queued = new AtomicLong(0); @@ -224,6 +225,7 @@ public class ZKSplitLog { public static AtomicLong tot_mgr_resubmit_failed = new AtomicLong(0); public static AtomicLong tot_mgr_null_data = new AtomicLong(0); public static AtomicLong tot_mgr_orphan_task_acquired = new AtomicLong(0); + public static AtomicLong tot_mgr_wait_for_zk_delete = new AtomicLong(0); public static AtomicLong tot_mgr_unacquired_orphan_done = new AtomicLong(0); public static AtomicLong tot_mgr_resubmit_threshold_reached = new AtomicLong(0); diff --git a/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java b/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java index a348f0c9f4e..b0487f13aba 100644 --- a/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java +++ b/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java @@ -34,6 +34,7 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -142,51 +143,6 @@ public class TestDistributedLogSplitting { ht.close(); } - @Test(expected=OrphanHLogAfterSplitException.class, timeout=300000) - public void testOrphanLogCreation() throws Exception { - LOG.info("testOrphanLogCreation"); - startCluster(NUM_RS); - final SplitLogManager slm = master.getMasterFileSystem().splitLogManager; - final FileSystem fs = master.getMasterFileSystem().getFileSystem(); - - List rsts = cluster.getLiveRegionServerThreads(); - HRegionServer hrs = rsts.get(0).getRegionServer(); - Path rootdir = FSUtils.getRootDir(conf); - final Path logDir = new Path(rootdir, - HLog.getHLogDirectoryName(hrs.getServerName().toString())); - - installTable(new ZooKeeperWatcher(conf, "table-creation", null), - "table", "family", 40); - - makeHLog(hrs.getWAL(), hrs.getOnlineRegions(), "table", - 1000, 100); - - new Thread() { - public void run() { - while (true) { - int i = 0; - try { - while(ZKSplitLog.Counters.tot_mgr_log_split_batch_start.get() == - 0) { - Thread.yield(); - } - fs.createNewFile(new Path(logDir, "foo" + i++)); - } catch (Exception e) { - LOG.debug("file creation failed", e); - return; - } - } - } - }.start(); - slm.splitLogDistributed(logDir); - FileStatus[] files = fs.listStatus(logDir); - if (files != null) { - for (FileStatus file : files) { - LOG.debug("file still there " + file.getPath()); - } - } - } - @Test (timeout=300000) public void testRecoveredEdits() throws Exception { LOG.info("testRecoveredEdits"); @@ -309,6 +265,45 @@ public class TestDistributedLogSplitting { "tot_wkr_preempt_task"); } + @Test + public void testDelayedDeleteOnFailure() throws Exception { + LOG.info("testDelayedDeleteOnFailure"); + startCluster(1); + final SplitLogManager slm = master.getMasterFileSystem().splitLogManager; + final FileSystem fs = master.getMasterFileSystem().getFileSystem(); + final Path logDir = new Path(FSUtils.getRootDir(conf), "x"); + fs.mkdirs(logDir); + final Path corruptedLogFile = new Path(logDir, "x"); + FSDataOutputStream out; + out = fs.create(corruptedLogFile); + out.write(0); + out.write(Bytes.toBytes("corrupted bytes")); + out.close(); + slm.ignoreZKDeleteForTesting = true; + Thread t = new Thread() { + @Override + public void run() { + try { + slm.splitLogDistributed(logDir); + } catch (IOException ioe) { + try { + assertTrue(fs.exists(corruptedLogFile)); + slm.splitLogDistributed(logDir); + } catch (IOException e) { + assertTrue(Thread.currentThread().isInterrupted()); + return; + } + fail("did not get the expected IOException from the 2nd call"); + } + fail("did not get the expected IOException from the 1st call"); + } + }; + t.start(); + waitForCounter(tot_mgr_wait_for_zk_delete, 0, 1, 10000); + t.interrupt(); + t.join(); + } + HTable installTable(ZooKeeperWatcher zkw, String tname, String fname, int nrs ) throws Exception { // Create a table with regions diff --git a/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java b/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java index 32ad7e8ffb9..0974b56be52 100644 --- a/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java +++ b/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java @@ -24,6 +24,7 @@ import static org.apache.hadoop.hbase.zookeeper.ZKSplitLog.Counters.*; import java.io.IOException; import java.util.Arrays; import java.util.List; +import java.util.UUID; import java.util.concurrent.atomic.AtomicLong; import static org.junit.Assert.*; @@ -31,6 +32,7 @@ import static org.junit.Assert.*; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.master.SplitLogManager.Task; @@ -112,19 +114,34 @@ public class TestSplitLogManager { TEST_UTIL.shutdownMiniZKCluster(); } - private void waitForCounter(AtomicLong ctr, long oldval, long newval, + private interface Expr { + public long eval(); + } + + private void waitForCounter(final AtomicLong ctr, long oldval, long newval, + long timems) { + Expr e = new Expr() { + public long eval() { + return ctr.get(); + } + }; + waitForCounter(e, oldval, newval, timems); + return; + } + + private void waitForCounter(Expr e, long oldval, long newval, long timems) { long curt = System.currentTimeMillis(); long endt = curt + timems; while (curt < endt) { - if (ctr.get() == oldval) { + if (e.eval() == oldval) { try { Thread.sleep(10); - } catch (InterruptedException e) { + } catch (InterruptedException eintr) { } curt = System.currentTimeMillis(); } else { - assertEquals(newval, ctr.get()); + assertEquals(newval, e.eval()); return; } } @@ -267,10 +284,8 @@ public class TestSplitLogManager { public void testRescanCleanup() throws Exception { LOG.info("TestRescanCleanup - ensure RESCAN nodes are cleaned up"); - int to = 1000; - conf.setInt("hbase.splitlog.manager.timeout", to); + conf.setInt("hbase.splitlog.manager.timeout", 1000); conf.setInt("hbase.splitlog.manager.timeoutmonitor.period", 100); - to = to + 2 * 100; slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null); slm.finishInitialization(); TaskBatch batch = new TaskBatch(); @@ -280,14 +295,23 @@ public class TestSplitLogManager { ZKUtil.setData(zkw, tasknode, TaskState.TASK_OWNED.get("worker1")); waitForCounter(tot_mgr_heartbeat, 0, 1, 1000); - waitForCounter(tot_mgr_resubmit, 0, 1, to + 100); - int version1 = ZKUtil.checkExists(zkw, tasknode); - assertTrue(version1 > version); - byte[] taskstate = ZKUtil.getData(zkw, tasknode); - assertTrue(Arrays.equals(TaskState.TASK_UNASSIGNED.get("dummy-master"), - taskstate)); - - waitForCounter(tot_mgr_rescan_deleted, 0, 1, 1000); + waitForCounter(new Expr() { + @Override + public long eval() { + return (tot_mgr_resubmit.get() + tot_mgr_resubmit_failed.get()); + } + }, 0, 1, 5*60000); // wait long enough + if (tot_mgr_resubmit_failed.get() == 0) { + int version1 = ZKUtil.checkExists(zkw, tasknode); + assertTrue(version1 > version); + byte[] taskstate = ZKUtil.getData(zkw, tasknode); + assertTrue(Arrays.equals(TaskState.TASK_UNASSIGNED.get("dummy-master"), + taskstate)); + + waitForCounter(tot_mgr_rescan_deleted, 0, 1, 1000); + } else { + LOG.warn("Could not run test. Lost ZK connection?"); + } return; } @@ -419,6 +443,52 @@ public class TestSplitLogManager { return; } + @Test + public void testEmptyLogDir() throws Exception { + LOG.info("testEmptyLogDir"); + slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null); + slm.finishInitialization(); + FileSystem fs = TEST_UTIL.getTestFileSystem(); + Path emptyLogDirPath = new Path(fs.getWorkingDirectory(), + UUID.randomUUID().toString()); + fs.mkdirs(emptyLogDirPath); + slm.splitLogDistributed(emptyLogDirPath); + assertFalse(fs.exists(emptyLogDirPath)); + } + + @Test + public void testVanishingTaskZNode() throws Exception { + LOG.info("testVanishingTaskZNode"); + conf.setInt("hbase.splitlog.manager.unassigned.timeout", 0); + slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null); + slm.finishInitialization(); + FileSystem fs = TEST_UTIL.getTestFileSystem(); + final Path logDir = new Path(fs.getWorkingDirectory(), + UUID.randomUUID().toString()); + fs.mkdirs(logDir); + Path logFile = new Path(logDir, UUID.randomUUID().toString()); + fs.createNewFile(logFile); + new Thread() { + public void run() { + try { + // this call will block because there are no SplitLogWorkers + slm.splitLogDistributed(logDir); + } catch (Exception e) { + LOG.warn("splitLogDistributed failed", e); + fail(); + } + } + }.start(); + waitForCounter(tot_mgr_node_create_result, 0, 1, 10000); + String znode = ZKSplitLog.getEncodedNodeName(zkw, logFile.toString()); + // remove the task znode + ZKUtil.deleteNode(zkw, znode); + waitForCounter(tot_mgr_get_data_nonode, 0, 1, 30000); + waitForCounter(tot_mgr_log_split_batch_success, 0, 1, 1000); + assertTrue(fs.exists(logFile)); + fs.delete(logDir, true); + } + @org.junit.Rule public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu = new org.apache.hadoop.hbase.ResourceCheckerJUnitRule();