HBASE-5150 Failure in a thread may not fail a test, clean up log splitting test (Jimmy Xiang)
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1231364 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
0876497eb9
commit
a9c183ff98
|
@ -485,7 +485,6 @@ public class SplitLogManager extends ZooKeeperListener {
|
|||
new_version, workerName);
|
||||
tot_mgr_heartbeat.incrementAndGet();
|
||||
} else {
|
||||
assert false;
|
||||
LOG.warn("got dup heartbeat for " + path + " ver = " + new_version);
|
||||
}
|
||||
return;
|
||||
|
|
|
@ -30,6 +30,11 @@ import java.util.List;
|
|||
import java.util.NavigableSet;
|
||||
import java.util.TreeSet;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -45,7 +50,6 @@ import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch;
|
|||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.HLog;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.OrphanHLogAfterSplitException;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
|
@ -59,9 +63,7 @@ import org.apache.log4j.Level;
|
|||
import org.apache.log4j.Logger;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
@Category(LargeTests.class)
|
||||
|
@ -265,7 +267,7 @@ public class TestDistributedLogSplitting {
|
|||
"tot_wkr_preempt_task");
|
||||
}
|
||||
|
||||
@Test
|
||||
@Test(timeout=25000)
|
||||
public void testDelayedDeleteOnFailure() throws Exception {
|
||||
LOG.info("testDelayedDeleteOnFailure");
|
||||
startCluster(1);
|
||||
|
@ -273,35 +275,60 @@ public class TestDistributedLogSplitting {
|
|||
final FileSystem fs = master.getMasterFileSystem().getFileSystem();
|
||||
final Path logDir = new Path(FSUtils.getRootDir(conf), "x");
|
||||
fs.mkdirs(logDir);
|
||||
final Path corruptedLogFile = new Path(logDir, "x");
|
||||
FSDataOutputStream out;
|
||||
out = fs.create(corruptedLogFile);
|
||||
out.write(0);
|
||||
out.write(Bytes.toBytes("corrupted bytes"));
|
||||
out.close();
|
||||
slm.ignoreZKDeleteForTesting = true;
|
||||
Thread t = new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
slm.splitLogDistributed(logDir);
|
||||
} catch (IOException ioe) {
|
||||
ExecutorService executor = null;
|
||||
try {
|
||||
final Path corruptedLogFile = new Path(logDir, "x");
|
||||
FSDataOutputStream out;
|
||||
out = fs.create(corruptedLogFile);
|
||||
out.write(0);
|
||||
out.write(Bytes.toBytes("corrupted bytes"));
|
||||
out.close();
|
||||
slm.ignoreZKDeleteForTesting = true;
|
||||
executor = Executors.newSingleThreadExecutor();
|
||||
Runnable runnable = new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
assertTrue(fs.exists(corruptedLogFile));
|
||||
// since the logDir is a fake, corrupted one, so the split log worker
|
||||
// will finish it quickly with error, and this call will fail and throw
|
||||
// an IOException.
|
||||
slm.splitLogDistributed(logDir);
|
||||
} catch (IOException e) {
|
||||
assertTrue(Thread.currentThread().isInterrupted());
|
||||
return;
|
||||
} catch (IOException ioe) {
|
||||
try {
|
||||
assertTrue(fs.exists(corruptedLogFile));
|
||||
// this call will block waiting for the task to be removed from the
|
||||
// tasks map which is not going to happen since ignoreZKDeleteForTesting
|
||||
// is set to true, until it is interrupted.
|
||||
slm.splitLogDistributed(logDir);
|
||||
} catch (IOException e) {
|
||||
assertTrue(Thread.currentThread().isInterrupted());
|
||||
return;
|
||||
}
|
||||
fail("did not get the expected IOException from the 2nd call");
|
||||
}
|
||||
fail("did not get the expected IOException from the 2nd call");
|
||||
fail("did not get the expected IOException from the 1st call");
|
||||
}
|
||||
fail("did not get the expected IOException from the 1st call");
|
||||
};
|
||||
Future<?> result = executor.submit(runnable);
|
||||
try {
|
||||
result.get(2000, TimeUnit.MILLISECONDS);
|
||||
} catch (TimeoutException te) {
|
||||
// it is ok, expected.
|
||||
}
|
||||
};
|
||||
t.start();
|
||||
waitForCounter(tot_mgr_wait_for_zk_delete, 0, 1, 10000);
|
||||
t.interrupt();
|
||||
t.join();
|
||||
waitForCounter(tot_mgr_wait_for_zk_delete, 0, 1, 10000);
|
||||
executor.shutdownNow();
|
||||
executor = null;
|
||||
|
||||
// make sure the runnable is finished with no exception thrown.
|
||||
result.get();
|
||||
} finally {
|
||||
if (executor != null) {
|
||||
// interrupt the thread in case the test fails in the middle.
|
||||
// it has no effect if the thread is already terminated.
|
||||
executor.shutdownNow();
|
||||
}
|
||||
fs.delete(logDir, true);
|
||||
}
|
||||
}
|
||||
|
||||
HTable installTable(ZooKeeperWatcher zkw, String tname, String fname,
|
||||
|
|
|
@ -456,7 +456,7 @@ public class TestSplitLogManager {
|
|||
assertFalse(fs.exists(emptyLogDirPath));
|
||||
}
|
||||
|
||||
@Test
|
||||
@Test(timeout=45000)
|
||||
public void testVanishingTaskZNode() throws Exception {
|
||||
LOG.info("testVanishingTaskZNode");
|
||||
conf.setInt("hbase.splitlog.manager.unassigned.timeout", 0);
|
||||
|
@ -466,27 +466,38 @@ public class TestSplitLogManager {
|
|||
final Path logDir = new Path(fs.getWorkingDirectory(),
|
||||
UUID.randomUUID().toString());
|
||||
fs.mkdirs(logDir);
|
||||
Path logFile = new Path(logDir, UUID.randomUUID().toString());
|
||||
fs.createNewFile(logFile);
|
||||
new Thread() {
|
||||
public void run() {
|
||||
try {
|
||||
// this call will block because there are no SplitLogWorkers
|
||||
slm.splitLogDistributed(logDir);
|
||||
} catch (Exception e) {
|
||||
LOG.warn("splitLogDistributed failed", e);
|
||||
fail();
|
||||
Thread thread = null;
|
||||
try {
|
||||
Path logFile = new Path(logDir, UUID.randomUUID().toString());
|
||||
fs.createNewFile(logFile);
|
||||
thread = new Thread() {
|
||||
public void run() {
|
||||
try {
|
||||
// this call will block because there are no SplitLogWorkers,
|
||||
// until the task znode is deleted below. Then the call will
|
||||
// complete successfully, assuming the log is split.
|
||||
slm.splitLogDistributed(logDir);
|
||||
} catch (Exception e) {
|
||||
LOG.warn("splitLogDistributed failed", e);
|
||||
}
|
||||
}
|
||||
};
|
||||
thread.start();
|
||||
waitForCounter(tot_mgr_node_create_result, 0, 1, 10000);
|
||||
String znode = ZKSplitLog.getEncodedNodeName(zkw, logFile.toString());
|
||||
// remove the task znode, to finish the distributed log splitting
|
||||
ZKUtil.deleteNode(zkw, znode);
|
||||
waitForCounter(tot_mgr_get_data_nonode, 0, 1, 30000);
|
||||
waitForCounter(tot_mgr_log_split_batch_success, 0, 1, 1000);
|
||||
assertTrue(fs.exists(logFile));
|
||||
} finally {
|
||||
if (thread != null) {
|
||||
// interrupt the thread in case the test fails in the middle.
|
||||
// it has no effect if the thread is already terminated.
|
||||
thread.interrupt();
|
||||
}
|
||||
}.start();
|
||||
waitForCounter(tot_mgr_node_create_result, 0, 1, 10000);
|
||||
String znode = ZKSplitLog.getEncodedNodeName(zkw, logFile.toString());
|
||||
// remove the task znode
|
||||
ZKUtil.deleteNode(zkw, znode);
|
||||
waitForCounter(tot_mgr_get_data_nonode, 0, 1, 30000);
|
||||
waitForCounter(tot_mgr_log_split_batch_success, 0, 1, 1000);
|
||||
assertTrue(fs.exists(logFile));
|
||||
fs.delete(logDir, true);
|
||||
fs.delete(logDir, true);
|
||||
}
|
||||
}
|
||||
|
||||
@org.junit.Rule
|
||||
|
|
Loading…
Reference in New Issue