HBASE-14648 Reenable TestWALProcedureStoreOnHDFS#testWalRollOnLowReplication (Heng Chen)

This commit is contained in:
stack 2015-10-24 20:55:37 -07:00
parent 40887c94b7
commit d76dbb4f84
1 changed files with 80 additions and 89 deletions

View File

@ -83,7 +83,7 @@ public class TestWALProcedureStoreOnHDFS {
}
};
private static void setupConf(Configuration conf) {
private static void initConfig(Configuration conf) {
conf.setInt("dfs.replication", 3);
conf.setInt("dfs.namenode.replication.min", 3);
@ -93,20 +93,16 @@ public class TestWALProcedureStoreOnHDFS {
conf.setInt("hbase.procedure.store.wal.sync.failure.roll.max", 10);
}
@Before
public void setup() throws Exception {
setupConf(UTIL.getConfiguration());
MiniDFSCluster dfs = UTIL.startMiniDFSCluster(3);
Path logDir = new Path(new Path(dfs.getFileSystem().getUri()), "/test-logs");
store = ProcedureTestingUtility.createWalStore(
UTIL.getConfiguration(), dfs.getFileSystem(), logDir);
store = ProcedureTestingUtility.createWalStore(UTIL.getConfiguration(), dfs.getFileSystem(), logDir);
store.registerListener(stopProcedureListener);
store.start(8);
store.recoverLease();
}
@After
public void tearDown() throws Exception {
store.stop(false);
UTIL.getDFSCluster().getFileSystem().delete(store.getLogDir(), true);
@ -120,107 +116,102 @@ public class TestWALProcedureStoreOnHDFS {
@Test(timeout=60000, expected=RuntimeException.class)
public void testWalAbortOnLowReplication() throws Exception {
assertEquals(3, UTIL.getDFSCluster().getDataNodes().size());
initConfig(UTIL.getConfiguration());
setup();
try {
assertEquals(3, UTIL.getDFSCluster().getDataNodes().size());
LOG.info("Stop DataNode");
UTIL.getDFSCluster().stopDataNode(0);
assertEquals(2, UTIL.getDFSCluster().getDataNodes().size());
store.insert(new TestProcedure(1, -1), null);
for (long i = 2; store.isRunning(); ++i) {
LOG.info("Stop DataNode");
UTIL.getDFSCluster().stopDataNode(0);
assertEquals(2, UTIL.getDFSCluster().getDataNodes().size());
store.insert(new TestProcedure(i, -1), null);
Thread.sleep(100);
store.insert(new TestProcedure(1, -1), null);
for (long i = 2; store.isRunning(); ++i) {
assertEquals(2, UTIL.getDFSCluster().getDataNodes().size());
store.insert(new TestProcedure(i, -1), null);
Thread.sleep(100);
}
assertFalse(store.isRunning());
fail("The store.insert() should throw an exeption");
} finally {
tearDown();
}
assertFalse(store.isRunning());
fail("The store.insert() should throw an exeption");
}
@Test(timeout=60000)
public void testWalAbortOnLowReplicationWithQueuedWriters() throws Exception {
assertEquals(3, UTIL.getDFSCluster().getDataNodes().size());
store.registerListener(new ProcedureStore.ProcedureStoreListener() {
@Override
public void postSync() {
Threads.sleepWithoutInterrupt(2000);
initConfig(UTIL.getConfiguration());
setup();
try {
assertEquals(3, UTIL.getDFSCluster().getDataNodes().size());
store.registerListener(new ProcedureStore.ProcedureStoreListener() {
@Override
public void postSync() {
Threads.sleepWithoutInterrupt(2000);
}
@Override
public void abortProcess() {}
});
final AtomicInteger reCount = new AtomicInteger(0);
Thread[] thread = new Thread[store.getNumThreads() * 2 + 1];
for (int i = 0; i < thread.length; ++i) {
final long procId = i + 1;
thread[i] = new Thread() {
public void run() {
try {
LOG.debug("[S] INSERT " + procId);
store.insert(new TestProcedure(procId, -1), null);
LOG.debug("[E] INSERT " + procId);
} catch (RuntimeException e) {
reCount.incrementAndGet();
LOG.debug("[F] INSERT " + procId + ": " + e.getMessage());
}
}
};
thread[i].start();
}
@Override
public void abortProcess() {}
});
Thread.sleep(1000);
LOG.info("Stop DataNode");
UTIL.getDFSCluster().stopDataNode(0);
assertEquals(2, UTIL.getDFSCluster().getDataNodes().size());
final AtomicInteger reCount = new AtomicInteger(0);
Thread[] thread = new Thread[store.getNumThreads() * 2 + 1];
for (int i = 0; i < thread.length; ++i) {
final long procId = i + 1;
thread[i] = new Thread() {
public void run() {
try {
LOG.debug("[S] INSERT " + procId);
store.insert(new TestProcedure(procId, -1), null);
LOG.debug("[E] INSERT " + procId);
} catch (RuntimeException e) {
reCount.incrementAndGet();
LOG.debug("[F] INSERT " + procId + ": " + e.getMessage());
}
}
};
thread[i].start();
for (int i = 0; i < thread.length; ++i) {
thread[i].join();
}
assertFalse(store.isRunning());
assertTrue(reCount.toString(), reCount.get() >= store.getNumThreads() &&
reCount.get() < thread.length);
} finally {
tearDown();
}
Thread.sleep(1000);
LOG.info("Stop DataNode");
UTIL.getDFSCluster().stopDataNode(0);
assertEquals(2, UTIL.getDFSCluster().getDataNodes().size());
for (int i = 0; i < thread.length; ++i) {
thread[i].join();
}
assertFalse(store.isRunning());
assertTrue(reCount.toString(), reCount.get() >= store.getNumThreads() &&
reCount.get() < thread.length);
}
@Ignore ("Needs work") @Test(timeout=60000)
@Test(timeout=60000)
public void testWalRollOnLowReplication() throws Exception {
store.unregisterListener(stopProcedureListener);
store.registerListener(new ProcedureStore.ProcedureStoreListener() {
@Override
public void postSync() {}
@Override
public void abortProcess() {
LOG.info("Aborted!!!!");
}
});
int dnCount = 0;
store.insert(new TestProcedure(1, -1), null);
UTIL.getDFSCluster().restartDataNode(dnCount);
for (long i = 2; i < 100; ++i) {
try {
initConfig(UTIL.getConfiguration());
UTIL.getConfiguration().setInt("dfs.namenode.replication.min", 1);
setup();
try {
int dnCount = 0;
store.insert(new TestProcedure(1, -1), null);
UTIL.getDFSCluster().restartDataNode(dnCount);
for (long i = 2; i < 100; ++i) {
store.insert(new TestProcedure(i, -1), null);
} catch (RuntimeException re) {
String msg = re.getMessage();
// We could get a sync failed here...if the test cluster is crawling such that DN recovery
// is taking a long time. If we've done enough passes, just finish up the test as a 'pass'
if (msg != null && msg.toLowerCase().contains("sync aborted")) {
LOG.info("i=" + i, re);
if (i > 50) {
LOG.info("Returning early... i=" + i + "...We ran enough of this test", re);
return;
}
waitForNumReplicas(3);
Thread.sleep(100);
if ((i % 30) == 0) {
LOG.info("Restart Data Node");
UTIL.getDFSCluster().restartDataNode(++dnCount % 3);
}
throw re;
}
waitForNumReplicas(3);
Thread.sleep(100);
if ((i % 30) == 0) {
LOG.info("Restart Data Node");
UTIL.getDFSCluster().restartDataNode(++dnCount % 3);
}
assertTrue(store.isRunning());
} finally {
tearDown();
}
assertTrue(store.isRunning());
}
public void waitForNumReplicas(int numReplicas) throws Exception {