HBASE-5081 Distributed log splitting deleteNode races against splitLog retry (Prakash)

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1227951 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Zhihong Yu 2012-01-06 03:53:58 +00:00
parent c3d7b8ec19
commit ff3cf1aeda
7 changed files with 344 additions and 239 deletions

View File

@ -470,6 +470,7 @@ Release 0.92.0 - Unreleased
are shutdown at the same time (Ming Ma)
HBASE-5094 The META can hold an entry for a region with a different server name from the one
actually in the AssignmentManager thus making the region inaccessible. (Ram)
HBASE-5081 Distributed log splitting deleteNode races against splitLog retry (Prakash)
TESTS
HBASE-4450 test for number of blocks read: to serve as baseline for expected

View File

@ -270,12 +270,7 @@ public class MasterFileSystem {
if (distributedLogSplitting) {
splitLogManager.handleDeadWorkers(serverNames);
splitTime = EnvironmentEdgeManager.currentTimeMillis();
try {
splitLogSize = splitLogManager.splitLogDistributed(logDirs);
} catch (OrphanHLogAfterSplitException e) {
LOG.warn("Retrying distributed splitting for " + serverNames, e);
splitLogManager.splitLogDistributed(logDirs);
}
splitLogSize = splitLogManager.splitLogDistributed(logDirs);
splitTime = EnvironmentEdgeManager.currentTimeMillis() - splitTime;
} else {
for(Path logDir: logDirs){

View File

@ -107,6 +107,7 @@ public class SplitLogManager extends ZooKeeperListener {
private long timeout;
private long unassignedTimeout;
private long lastNodeCreateTime = Long.MAX_VALUE;
public boolean ignoreZKDeleteForTesting = false;
private ConcurrentMap<String, Task> tasks =
new ConcurrentHashMap<String, Task>();
@ -116,10 +117,12 @@ public class SplitLogManager extends ZooKeeperListener {
private Object deadWorkersLock = new Object();
/**
* Its OK to construct this object even when region-servers are not online. It
* does lookup the orphan tasks in zk but it doesn't block for them to be
* done.
*
* Wrapper around {@link #SplitLogManager(ZooKeeperWatcher, Configuration,
* Stoppable, String, TaskFinisher)} that provides a task finisher for
* copying recovered edits to their final destination. The task finisher
* has to be robust because it can be arbitrarily restarted or called
* multiple times.
*
* @param zkw
* @param conf
* @param stopper
@ -142,6 +145,18 @@ public class SplitLogManager extends ZooKeeperListener {
}
});
}
/**
* Its OK to construct this object even when region-servers are not online. It
* does lookup the orphan tasks in zk but it doesn't block waiting for them
* to be done.
*
* @param zkw
* @param conf
* @param stopper
* @param serverName
* @param tf task finisher
*/
public SplitLogManager(ZooKeeperWatcher zkw, Configuration conf,
Stoppable stopper, String serverName, TaskFinisher tf) {
super(zkw);
@ -194,8 +209,6 @@ public class SplitLogManager extends ZooKeeperListener {
fileStatus.add(status);
}
}
if (fileStatus.isEmpty())
return null;
FileStatus[] a = new FileStatus[fileStatus.size()];
return fileStatus.toArray(a);
}
@ -228,8 +241,6 @@ public class SplitLogManager extends ZooKeeperListener {
MonitoredTask status = TaskMonitor.get().createStatus(
"Doing distributed log split in " + logDirs);
FileStatus[] logfiles = getFileList(logDirs);
if(logfiles == null)
return 0;
status.setStatus("Checking directory contents...");
LOG.debug("Scheduling batch of logs to split");
tot_mgr_log_split_batch_start.incrementAndGet();
@ -251,7 +262,7 @@ public class SplitLogManager extends ZooKeeperListener {
}
waitForSplittingCompletion(batch, status);
if (batch.done != batch.installed) {
stopTrackingTasks(batch);
batch.isDead = true;
tot_mgr_log_split_batch_err.incrementAndGet();
LOG.warn("error while splitting logs in " + logDirs +
" installed = " + batch.installed + " but only " + batch.done + " done");
@ -259,17 +270,21 @@ public class SplitLogManager extends ZooKeeperListener {
+ logDirs + " Task = " + batch);
}
for(Path logDir: logDirs){
if (anyNewLogFiles(logDir, logfiles)) {
tot_mgr_new_unexpected_hlogs.incrementAndGet();
LOG.warn("new hlogs were produced while logs in " + logDir +
" were being split");
throw new OrphanHLogAfterSplitException();
status.setStatus("Cleaning up log directory...");
try {
if (fs.exists(logDir) && !fs.delete(logDir, false)) {
LOG.warn("Unable to delete log src dir. Ignoring. " + logDir);
}
} catch (IOException ioe) {
FileStatus[] files = fs.listStatus(logDir);
if (files != null && files.length > 0) {
LOG.warn("returning success without actually splitting and " +
"deleting all the log files in path " + logDir);
} else {
LOG.warn("Unable to delete log src dir. Ignoring. " + logDir, ioe);
}
}
tot_mgr_log_split_batch_success.incrementAndGet();
status.setStatus("Cleaning up log directory...");
if (!fs.delete(logDir, true)) {
throw new IOException("Unable to delete src dir: " + logDir);
}
}
String msg = "finished splitting (more than or equal to) " + totalSize +
" bytes in " + batch.installed + " log files in " + logDirs + " in " +
@ -295,8 +310,6 @@ public class SplitLogManager extends ZooKeeperListener {
createNode(path, zkretries);
return true;
}
LOG.warn(path + "is already being split. " +
"Two threads cannot wait for the same task");
return false;
}
@ -323,15 +336,6 @@ public class SplitLogManager extends ZooKeeperListener {
}
private void setDone(String path, TerminationStatus status) {
if (!ZKSplitLog.isRescanNode(watcher, path)) {
if (status == SUCCESS) {
tot_mgr_log_split_success.incrementAndGet();
LOG.info("Done splitting " + path);
} else {
tot_mgr_log_split_err.incrementAndGet();
LOG.warn("Error splitting " + path);
}
}
Task task = tasks.get(path);
if (task == null) {
if (!ZKSplitLog.isRescanNode(watcher, path)) {
@ -340,18 +344,24 @@ public class SplitLogManager extends ZooKeeperListener {
}
} else {
synchronized (task) {
task.deleted = true;
// if in stopTrackingTasks() we were to make tasks orphan instead of
// forgetting about them then we will have to handle the race when
// accessing task.batch here.
if (!task.isOrphan()) {
synchronized (task.batch) {
if (status == SUCCESS) {
task.batch.done++;
} else {
task.batch.error++;
if (task.status == IN_PROGRESS) {
if (status == SUCCESS) {
tot_mgr_log_split_success.incrementAndGet();
LOG.info("Done splitting " + path);
} else {
tot_mgr_log_split_err.incrementAndGet();
LOG.warn("Error splitting " + path);
}
task.status = status;
if (task.batch != null) {
synchronized (task.batch) {
if (status == SUCCESS) {
task.batch.done++;
} else {
task.batch.error++;
}
task.batch.notify();
}
task.batch.notify();
}
}
}
@ -394,6 +404,11 @@ public class SplitLogManager extends ZooKeeperListener {
private void getDataSetWatchSuccess(String path, byte[] data, int version) {
if (data == null) {
if (version == Integer.MIN_VALUE) {
// assume all done. The task znode suddenly disappeared.
setDone(path, SUCCESS);
return;
}
tot_mgr_null_data.incrementAndGet();
LOG.fatal("logic error - got null data " + path);
setDone(path, FAILURE);
@ -480,7 +495,7 @@ public class SplitLogManager extends ZooKeeperListener {
ResubmitDirective directive) {
// its ok if this thread misses the update to task.deleted. It will
// fail later
if (task.deleted) {
if (task.status != IN_PROGRESS) {
return false;
}
int version;
@ -490,7 +505,8 @@ public class SplitLogManager extends ZooKeeperListener {
return false;
}
if (task.unforcedResubmits >= resubmit_threshold) {
if (task.unforcedResubmits == resubmit_threshold) {
if (!task.resubmitThresholdReached) {
task.resubmitThresholdReached = true;
tot_mgr_resubmit_threshold_reached.incrementAndGet();
LOG.info("Skipping resubmissions of task " + path +
" because threshold " + resubmit_threshold + " reached");
@ -514,7 +530,9 @@ public class SplitLogManager extends ZooKeeperListener {
return false;
}
} catch (NoNodeException e) {
LOG.debug("failed to resubmit " + path + " task done");
LOG.warn("failed to resubmit because znode doesn't exist " + path +
" task done (or forced done by removing the znode)");
getDataSetWatchSuccess(path, null, Integer.MIN_VALUE);
return false;
} catch (KeeperException e) {
tot_mgr_resubmit_failed.incrementAndGet();
@ -539,12 +557,18 @@ public class SplitLogManager extends ZooKeeperListener {
private void deleteNode(String path, Long retries) {
tot_mgr_node_delete_queued.incrementAndGet();
// Once a task znode is ready for delete, that is it is in the TASK_DONE
// state, then no one should be writing to it anymore. That is no one
// will be updating the znode version any more.
this.watcher.getRecoverableZooKeeper().getZooKeeper().
delete(path, -1, new DeleteAsyncCallback(),
retries);
}
private void deleteNodeSuccess(String path) {
if (ignoreZKDeleteForTesting) {
return;
}
Task task;
task = tasks.remove(path);
if (task == null) {
@ -555,6 +579,10 @@ public class SplitLogManager extends ZooKeeperListener {
LOG.debug("deleted task without in memory state " + path);
return;
}
synchronized (task) {
task.status = DELETED;
task.notify();
}
tot_mgr_task_deleted.incrementAndGet();
}
@ -603,59 +631,67 @@ public class SplitLogManager extends ZooKeeperListener {
Task oldtask;
// batch.installed is only changed via this function and
// a single thread touches batch.installed.
oldtask = tasks.putIfAbsent(path, new Task(batch));
if (oldtask != null) {
// new task was not used.
batch.installed--;
synchronized (oldtask) {
if (oldtask.isOrphan()) {
if (oldtask.deleted) {
// The task is already done. Do not install the batch for this
// task because it might be too late for setDone() to update
// batch.done. There is no need for the batch creator to wait for
// this task to complete.
return (null);
}
oldtask.setBatch(batch);
}
}
LOG.info("Previously orphan task " + path +
" is now being waited upon");
return (null);
Task newtask = new Task();
newtask.batch = batch;
oldtask = tasks.putIfAbsent(path, newtask);
if (oldtask == null) {
batch.installed++;
return null;
}
return oldtask;
}
/**
* This function removes any knowledge of this batch's tasks from the
* manager. It doesn't actually stop the active tasks. If the tasks are
* resubmitted then the active tasks will be reacquired and monitored by the
* manager. It is important to call this function when batch processing
* terminates prematurely, otherwise if the tasks are re-submitted
* then they might fail.
* <p>
* there is a slight race here. even after a task has been removed from
* {@link #tasks} someone who had acquired a reference to it will continue to
* process the task. That is OK since we don't actually change the task and
* the batch objects.
* <p>
* TODO Its probably better to convert these to orphan tasks but then we
* have to deal with race conditions as we nullify Task's batch pointer etc.
* <p>
* @param batch
*/
void stopTrackingTasks(TaskBatch batch) {
for (Map.Entry<String, Task> e : tasks.entrySet()) {
String path = e.getKey();
Task t = e.getValue();
if (t.batch == batch) { // == is correct. equals not necessary.
tasks.remove(path);
// new task was not used.
synchronized (oldtask) {
if (oldtask.isOrphan()) {
if (oldtask.status == SUCCESS) {
// The task is already done. Do not install the batch for this
// task because it might be too late for setDone() to update
// batch.done. There is no need for the batch creator to wait for
// this task to complete.
return (null);
}
if (oldtask.status == IN_PROGRESS) {
oldtask.batch = batch;
batch.installed++;
LOG.debug("Previously orphan task " + path +
" is now being waited upon");
return null;
}
while (oldtask.status == FAILURE) {
LOG.debug("wait for status of task " + path +
" to change to DELETED");
tot_mgr_wait_for_zk_delete.incrementAndGet();
try {
oldtask.wait();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.warn("Interrupted when waiting for znode delete callback");
// fall through to return failure
break;
}
}
if (oldtask.status != DELETED) {
LOG.warn("Failure because previously failed task" +
" state still present. Waiting for znode delete callback" +
" path=" + path);
return oldtask;
}
// reinsert the newTask and it must succeed this time
Task t = tasks.putIfAbsent(path, newtask);
if (t == null) {
batch.installed++;
return null;
}
LOG.fatal("Logic error. Deleted task still present in tasks map");
assert false : "Deleted task still present in tasks map";
return t;
}
LOG.warn("Failure because two threads can't wait for the same task. " +
" path=" + path);
return oldtask;
}
}
Task findOrCreateOrphanTask(String path) {
Task orphanTask = new Task(null);
Task orphanTask = new Task();
Task task;
task = tasks.putIfAbsent(path, orphanTask);
if (task == null) {
@ -716,9 +752,10 @@ public class SplitLogManager extends ZooKeeperListener {
* All access is synchronized.
*/
static class TaskBatch {
int installed;
int done;
int error;
int installed = 0;
int done = 0;
int error = 0;
volatile boolean isDead = false;
@Override
public String toString() {
@ -731,45 +768,35 @@ public class SplitLogManager extends ZooKeeperListener {
* in memory state of an active task.
*/
static class Task {
long last_update;
int last_version;
String cur_worker_name;
volatile long last_update;
volatile int last_version;
volatile String cur_worker_name;
TaskBatch batch;
boolean deleted;
int incarnation;
int unforcedResubmits;
volatile TerminationStatus status;
volatile int incarnation;
volatile int unforcedResubmits;
volatile boolean resubmitThresholdReached;
@Override
public String toString() {
return ("last_update = " + last_update +
" last_version = " + last_version +
" cur_worker_name = " + cur_worker_name +
" deleted = " + deleted +
" status = " + status +
" incarnation = " + incarnation +
" resubmits = " + unforcedResubmits +
" batch = " + batch);
}
Task(TaskBatch tb) {
Task() {
incarnation = 0;
last_version = -1;
deleted = false;
setBatch(tb);
status = IN_PROGRESS;
setUnassigned();
}
public void setBatch(TaskBatch batch) {
if (batch != null && this.batch != null) {
LOG.fatal("logic error - batch being overwritten");
}
this.batch = batch;
if (batch != null) {
batch.installed++;
}
}
public boolean isOrphan() {
return (batch == null);
return (batch == null || batch.isDead);
}
public boolean isUnassigned() {
@ -882,6 +909,16 @@ public class SplitLogManager extends ZooKeeperListener {
if (tot > 0 && !found_assigned_task &&
((EnvironmentEdgeManager.currentTimeMillis() - lastNodeCreateTime) >
unassignedTimeout)) {
for (Map.Entry<String, Task> e : tasks.entrySet()) {
String path = e.getKey();
Task task = e.getValue();
// we have to do this check again because tasks might have
// been asynchronously assigned.
if (task.isUnassigned()) {
// We just touch the znode to make sure its still there
getDataSetWatch(path, zkretries);
}
}
createRescanNode(Long.MAX_VALUE);
tot_mgr_resubmit_unassigned.incrementAndGet();
LOG.debug("resubmitting unassigned task(s) after timeout");
@ -901,6 +938,12 @@ public class SplitLogManager extends ZooKeeperListener {
tot_mgr_node_create_result.incrementAndGet();
if (rc != 0) {
if (rc == KeeperException.Code.NODEEXISTS.intValue()) {
// What if there is a delete pending against this pre-existing
// znode? Then this soon-to-be-deleted task znode must be in TASK_DONE
// state. Only operations that will be carried out on this node by
// this manager are get-znode-data, task-finisher and delete-znode.
// And all code pieces correctly handle the case of suddenly
// disappearing task-znode.
LOG.debug("found pre-existing znode " + path);
tot_mgr_node_already_exists.incrementAndGet();
} else {
@ -933,6 +976,15 @@ public class SplitLogManager extends ZooKeeperListener {
Stat stat) {
tot_mgr_get_data_result.incrementAndGet();
if (rc != 0) {
if (rc == KeeperException.Code.NONODE.intValue()) {
tot_mgr_get_data_nonode.incrementAndGet();
// The task znode has been deleted. Must be some pending delete
// that deleted the task. Assume success because a task-znode is
// is only deleted after TaskFinisher is successful.
LOG.warn("task znode " + path + " vanished.");
getDataSetWatchSuccess(path, null, Integer.MIN_VALUE);
return;
}
Long retry_count = (Long) ctx;
LOG.warn("getdata rc = " + KeeperException.Code.get(rc) + " " +
path + " remaining retries=" + retry_count);
@ -974,9 +1026,10 @@ public class SplitLogManager extends ZooKeeperListener {
}
return;
} else {
LOG.debug(path
+ " does not exist, either was never created or was deleted"
+ " in earlier rounds, zkretries = " + (Long) ctx);
LOG.debug(path +
" does not exist. Either was created but deleted behind our" +
" back by another pending delete OR was deleted" +
" in earlier retry rounds. zkretries = " + (Long) ctx);
}
} else {
LOG.debug("deleted " + path);
@ -1013,47 +1066,11 @@ public class SplitLogManager extends ZooKeeperListener {
}
}
/**
* checks whether any new files have appeared in logDir which were
* not present in the original logfiles set
* @param logdir
* @param logfiles
* @return True if a new log file is found
* @throws IOException
*/
public boolean anyNewLogFiles(Path logdir, FileStatus[] logfiles)
throws IOException {
if (logdir == null) {
return false;
}
LOG.debug("re-listing " + logdir);
tot_mgr_relist_logdir.incrementAndGet();
FileStatus[] newfiles = FSUtils.listStatus(fs, logdir, null);
if (newfiles == null) {
return false;
}
boolean matched;
for (FileStatus newfile : newfiles) {
matched = false;
for (FileStatus origfile : logfiles) {
if (origfile.equals(newfile)) {
matched = true;
break;
}
}
if (matched == false) {
LOG.warn("Discovered orphan hlog " + newfile + " after split." +
" Maybe HRegionServer was not dead when we started");
return true;
}
}
return false;
}
/**
* {@link SplitLogManager} can use objects implementing this interface to
* finish off a partially done task by {@link SplitLogWorker}. This provides
* a serialization point at the end of the task processing.
* a serialization point at the end of the task processing. Must be
* restartable and idempotent.
*/
static public interface TaskFinisher {
/**
@ -1085,7 +1102,19 @@ public class SplitLogManager extends ZooKeeperListener {
FORCE();
}
enum TerminationStatus {
SUCCESS(),
FAILURE();
IN_PROGRESS("in_progress"),
SUCCESS("success"),
FAILURE("failure"),
DELETED("deleted");
String statusMsg;
TerminationStatus(String msg) {
statusMsg = msg;
}
@Override
public String toString() {
return statusMsg;
}
}
}

View File

@ -541,16 +541,21 @@ public class HLogSplitter {
if (ZKSplitLog.isCorruptFlagFile(dst)) {
continue;
}
if (fs.exists(dst)) {
fs.delete(dst, false);
} else {
Path dstdir = dst.getParent();
if (!fs.exists(dstdir)) {
if (!fs.mkdirs(dstdir)) LOG.warn("mkdir failed on " + dstdir);
if (fs.exists(src)) {
if (fs.exists(dst)) {
fs.delete(dst, false);
} else {
Path dstdir = dst.getParent();
if (!fs.exists(dstdir)) {
if (!fs.mkdirs(dstdir)) LOG.warn("mkdir failed on " + dstdir);
}
}
fs.rename(src, dst);
LOG.debug(" moved " + src + " => " + dst);
} else {
LOG.debug("Could not move recovered edits from " + src +
" as it doesn't exist");
}
fs.rename(src, dst);
LOG.debug(" moved " + src + " => " + dst);
}
archiveLogs(null, corruptedLogs, processedLogs,
oldLogDir, fs, conf);
@ -600,24 +605,32 @@ public class HLogSplitter {
}
fs.mkdirs(oldLogDir);
// this method can get restarted or called multiple times for archiving
// the same log files.
for (Path corrupted : corruptedLogs) {
Path p = new Path(corruptDir, corrupted.getName());
if (!fs.rename(corrupted, p)) {
LOG.info("Unable to move corrupted log " + corrupted + " to " + p);
} else {
LOG.info("Moving corrupted log " + corrupted + " to " + p);
if (fs.exists(corrupted)) {
if (!fs.rename(corrupted, p)) {
LOG.warn("Unable to move corrupted log " + corrupted + " to " + p);
} else {
LOG.warn("Moving corrupted log " + corrupted + " to " + p);
}
}
}
for (Path p : processedLogs) {
Path newPath = HLog.getHLogArchivePath(oldLogDir, p);
if (!fs.rename(p, newPath)) {
LOG.info("Unable to move " + p + " to " + newPath);
} else {
LOG.info("Archived processed log " + p + " to " + newPath);
if (fs.exists(p)) {
if (!fs.rename(p, newPath)) {
LOG.warn("Unable to move " + p + " to " + newPath);
} else {
LOG.debug("Archived processed log " + p + " to " + newPath);
}
}
}
// distributed log splitting removes the srcDir (region's log dir) later
// when all the log files in that srcDir have been successfully processed
if (srcDir != null && !fs.delete(srcDir, true)) {
throw new IOException("Unable to delete src dir: " + srcDir);
}

View File

@ -215,6 +215,7 @@ public class ZKSplitLog {
public static AtomicLong tot_mgr_node_create_retry = new AtomicLong(0);
public static AtomicLong tot_mgr_get_data_queued = new AtomicLong(0);
public static AtomicLong tot_mgr_get_data_result = new AtomicLong(0);
public static AtomicLong tot_mgr_get_data_nonode = new AtomicLong(0);
public static AtomicLong tot_mgr_get_data_err = new AtomicLong(0);
public static AtomicLong tot_mgr_get_data_retry = new AtomicLong(0);
public static AtomicLong tot_mgr_node_delete_queued = new AtomicLong(0);
@ -224,6 +225,7 @@ public class ZKSplitLog {
public static AtomicLong tot_mgr_resubmit_failed = new AtomicLong(0);
public static AtomicLong tot_mgr_null_data = new AtomicLong(0);
public static AtomicLong tot_mgr_orphan_task_acquired = new AtomicLong(0);
public static AtomicLong tot_mgr_wait_for_zk_delete = new AtomicLong(0);
public static AtomicLong tot_mgr_unacquired_orphan_done = new AtomicLong(0);
public static AtomicLong tot_mgr_resubmit_threshold_reached =
new AtomicLong(0);

View File

@ -34,6 +34,7 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@ -142,51 +143,6 @@ public class TestDistributedLogSplitting {
ht.close();
}
@Test(expected=OrphanHLogAfterSplitException.class, timeout=300000)
public void testOrphanLogCreation() throws Exception {
LOG.info("testOrphanLogCreation");
startCluster(NUM_RS);
final SplitLogManager slm = master.getMasterFileSystem().splitLogManager;
final FileSystem fs = master.getMasterFileSystem().getFileSystem();
List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
HRegionServer hrs = rsts.get(0).getRegionServer();
Path rootdir = FSUtils.getRootDir(conf);
final Path logDir = new Path(rootdir,
HLog.getHLogDirectoryName(hrs.getServerName().toString()));
installTable(new ZooKeeperWatcher(conf, "table-creation", null),
"table", "family", 40);
makeHLog(hrs.getWAL(), hrs.getOnlineRegions(), "table",
1000, 100);
new Thread() {
public void run() {
while (true) {
int i = 0;
try {
while(ZKSplitLog.Counters.tot_mgr_log_split_batch_start.get() ==
0) {
Thread.yield();
}
fs.createNewFile(new Path(logDir, "foo" + i++));
} catch (Exception e) {
LOG.debug("file creation failed", e);
return;
}
}
}
}.start();
slm.splitLogDistributed(logDir);
FileStatus[] files = fs.listStatus(logDir);
if (files != null) {
for (FileStatus file : files) {
LOG.debug("file still there " + file.getPath());
}
}
}
@Test (timeout=300000)
public void testRecoveredEdits() throws Exception {
LOG.info("testRecoveredEdits");
@ -309,6 +265,45 @@ public class TestDistributedLogSplitting {
"tot_wkr_preempt_task");
}
@Test
public void testDelayedDeleteOnFailure() throws Exception {
LOG.info("testDelayedDeleteOnFailure");
startCluster(1);
final SplitLogManager slm = master.getMasterFileSystem().splitLogManager;
final FileSystem fs = master.getMasterFileSystem().getFileSystem();
final Path logDir = new Path(FSUtils.getRootDir(conf), "x");
fs.mkdirs(logDir);
final Path corruptedLogFile = new Path(logDir, "x");
FSDataOutputStream out;
out = fs.create(corruptedLogFile);
out.write(0);
out.write(Bytes.toBytes("corrupted bytes"));
out.close();
slm.ignoreZKDeleteForTesting = true;
Thread t = new Thread() {
@Override
public void run() {
try {
slm.splitLogDistributed(logDir);
} catch (IOException ioe) {
try {
assertTrue(fs.exists(corruptedLogFile));
slm.splitLogDistributed(logDir);
} catch (IOException e) {
assertTrue(Thread.currentThread().isInterrupted());
return;
}
fail("did not get the expected IOException from the 2nd call");
}
fail("did not get the expected IOException from the 1st call");
}
};
t.start();
waitForCounter(tot_mgr_wait_for_zk_delete, 0, 1, 10000);
t.interrupt();
t.join();
}
HTable installTable(ZooKeeperWatcher zkw, String tname, String fname,
int nrs ) throws Exception {
// Create a table with regions

View File

@ -24,6 +24,7 @@ import static org.apache.hadoop.hbase.zookeeper.ZKSplitLog.Counters.*;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
import static org.junit.Assert.*;
@ -31,6 +32,7 @@ import static org.junit.Assert.*;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.master.SplitLogManager.Task;
@ -112,19 +114,34 @@ public class TestSplitLogManager {
TEST_UTIL.shutdownMiniZKCluster();
}
private void waitForCounter(AtomicLong ctr, long oldval, long newval,
private interface Expr {
public long eval();
}
private void waitForCounter(final AtomicLong ctr, long oldval, long newval,
long timems) {
Expr e = new Expr() {
public long eval() {
return ctr.get();
}
};
waitForCounter(e, oldval, newval, timems);
return;
}
private void waitForCounter(Expr e, long oldval, long newval,
long timems) {
long curt = System.currentTimeMillis();
long endt = curt + timems;
while (curt < endt) {
if (ctr.get() == oldval) {
if (e.eval() == oldval) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
} catch (InterruptedException eintr) {
}
curt = System.currentTimeMillis();
} else {
assertEquals(newval, ctr.get());
assertEquals(newval, e.eval());
return;
}
}
@ -267,10 +284,8 @@ public class TestSplitLogManager {
public void testRescanCleanup() throws Exception {
LOG.info("TestRescanCleanup - ensure RESCAN nodes are cleaned up");
int to = 1000;
conf.setInt("hbase.splitlog.manager.timeout", to);
conf.setInt("hbase.splitlog.manager.timeout", 1000);
conf.setInt("hbase.splitlog.manager.timeoutmonitor.period", 100);
to = to + 2 * 100;
slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null);
slm.finishInitialization();
TaskBatch batch = new TaskBatch();
@ -280,14 +295,23 @@ public class TestSplitLogManager {
ZKUtil.setData(zkw, tasknode, TaskState.TASK_OWNED.get("worker1"));
waitForCounter(tot_mgr_heartbeat, 0, 1, 1000);
waitForCounter(tot_mgr_resubmit, 0, 1, to + 100);
int version1 = ZKUtil.checkExists(zkw, tasknode);
assertTrue(version1 > version);
byte[] taskstate = ZKUtil.getData(zkw, tasknode);
assertTrue(Arrays.equals(TaskState.TASK_UNASSIGNED.get("dummy-master"),
taskstate));
waitForCounter(tot_mgr_rescan_deleted, 0, 1, 1000);
waitForCounter(new Expr() {
@Override
public long eval() {
return (tot_mgr_resubmit.get() + tot_mgr_resubmit_failed.get());
}
}, 0, 1, 5*60000); // wait long enough
if (tot_mgr_resubmit_failed.get() == 0) {
int version1 = ZKUtil.checkExists(zkw, tasknode);
assertTrue(version1 > version);
byte[] taskstate = ZKUtil.getData(zkw, tasknode);
assertTrue(Arrays.equals(TaskState.TASK_UNASSIGNED.get("dummy-master"),
taskstate));
waitForCounter(tot_mgr_rescan_deleted, 0, 1, 1000);
} else {
LOG.warn("Could not run test. Lost ZK connection?");
}
return;
}
@ -419,6 +443,52 @@ public class TestSplitLogManager {
return;
}
@Test
public void testEmptyLogDir() throws Exception {
LOG.info("testEmptyLogDir");
slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null);
slm.finishInitialization();
FileSystem fs = TEST_UTIL.getTestFileSystem();
Path emptyLogDirPath = new Path(fs.getWorkingDirectory(),
UUID.randomUUID().toString());
fs.mkdirs(emptyLogDirPath);
slm.splitLogDistributed(emptyLogDirPath);
assertFalse(fs.exists(emptyLogDirPath));
}
@Test
public void testVanishingTaskZNode() throws Exception {
LOG.info("testVanishingTaskZNode");
conf.setInt("hbase.splitlog.manager.unassigned.timeout", 0);
slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null);
slm.finishInitialization();
FileSystem fs = TEST_UTIL.getTestFileSystem();
final Path logDir = new Path(fs.getWorkingDirectory(),
UUID.randomUUID().toString());
fs.mkdirs(logDir);
Path logFile = new Path(logDir, UUID.randomUUID().toString());
fs.createNewFile(logFile);
new Thread() {
public void run() {
try {
// this call will block because there are no SplitLogWorkers
slm.splitLogDistributed(logDir);
} catch (Exception e) {
LOG.warn("splitLogDistributed failed", e);
fail();
}
}
}.start();
waitForCounter(tot_mgr_node_create_result, 0, 1, 10000);
String znode = ZKSplitLog.getEncodedNodeName(zkw, logFile.toString());
// remove the task znode
ZKUtil.deleteNode(zkw, znode);
waitForCounter(tot_mgr_get_data_nonode, 0, 1, 30000);
waitForCounter(tot_mgr_log_split_batch_success, 0, 1, 1000);
assertTrue(fs.exists(logFile));
fs.delete(logDir, true);
}
@org.junit.Rule
public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu =
new org.apache.hadoop.hbase.ResourceCheckerJUnitRule();