HBASE-14648 Reenable TestWALProcedureStoreOnHDFS#testWalRollOnLowReplication (Heng Chen)
This commit is contained in:
parent
95978477f8
commit
46c646d615
|
@ -66,7 +66,7 @@ public class TestWALProcedureStoreOnHDFS {
|
|||
}
|
||||
};
|
||||
|
||||
private static void setupConf(Configuration conf) {
|
||||
private static void initConfig(Configuration conf) {
|
||||
conf.setInt("dfs.replication", 3);
|
||||
conf.setInt("dfs.namenode.replication.min", 3);
|
||||
|
||||
|
@ -76,20 +76,16 @@ public class TestWALProcedureStoreOnHDFS {
|
|||
conf.setInt("hbase.procedure.store.wal.sync.failure.roll.max", 10);
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setup() throws Exception {
|
||||
setupConf(UTIL.getConfiguration());
|
||||
MiniDFSCluster dfs = UTIL.startMiniDFSCluster(3);
|
||||
|
||||
Path logDir = new Path(new Path(dfs.getFileSystem().getUri()), "/test-logs");
|
||||
store = ProcedureTestingUtility.createWalStore(
|
||||
UTIL.getConfiguration(), dfs.getFileSystem(), logDir);
|
||||
store = ProcedureTestingUtility.createWalStore(UTIL.getConfiguration(), dfs.getFileSystem(), logDir);
|
||||
store.registerListener(stopProcedureListener);
|
||||
store.start(8);
|
||||
store.recoverLease();
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
store.stop(false);
|
||||
UTIL.getDFSCluster().getFileSystem().delete(store.getLogDir(), true);
|
||||
|
@ -103,107 +99,102 @@ public class TestWALProcedureStoreOnHDFS {
|
|||
|
||||
@Test(timeout=60000, expected=RuntimeException.class)
|
||||
public void testWalAbortOnLowReplication() throws Exception {
|
||||
assertEquals(3, UTIL.getDFSCluster().getDataNodes().size());
|
||||
initConfig(UTIL.getConfiguration());
|
||||
setup();
|
||||
try {
|
||||
assertEquals(3, UTIL.getDFSCluster().getDataNodes().size());
|
||||
|
||||
LOG.info("Stop DataNode");
|
||||
UTIL.getDFSCluster().stopDataNode(0);
|
||||
assertEquals(2, UTIL.getDFSCluster().getDataNodes().size());
|
||||
|
||||
store.insert(new TestProcedure(1, -1), null);
|
||||
for (long i = 2; store.isRunning(); ++i) {
|
||||
LOG.info("Stop DataNode");
|
||||
UTIL.getDFSCluster().stopDataNode(0);
|
||||
assertEquals(2, UTIL.getDFSCluster().getDataNodes().size());
|
||||
store.insert(new TestProcedure(i, -1), null);
|
||||
Thread.sleep(100);
|
||||
|
||||
store.insert(new TestProcedure(1, -1), null);
|
||||
for (long i = 2; store.isRunning(); ++i) {
|
||||
assertEquals(2, UTIL.getDFSCluster().getDataNodes().size());
|
||||
store.insert(new TestProcedure(i, -1), null);
|
||||
Thread.sleep(100);
|
||||
}
|
||||
assertFalse(store.isRunning());
|
||||
fail("The store.insert() should throw an exeption");
|
||||
} finally {
|
||||
tearDown();
|
||||
}
|
||||
assertFalse(store.isRunning());
|
||||
fail("The store.insert() should throw an exeption");
|
||||
}
|
||||
|
||||
@Test(timeout=60000)
|
||||
public void testWalAbortOnLowReplicationWithQueuedWriters() throws Exception {
|
||||
assertEquals(3, UTIL.getDFSCluster().getDataNodes().size());
|
||||
store.registerListener(new ProcedureStore.ProcedureStoreListener() {
|
||||
@Override
|
||||
public void postSync() {
|
||||
Threads.sleepWithoutInterrupt(2000);
|
||||
initConfig(UTIL.getConfiguration());
|
||||
setup();
|
||||
try {
|
||||
assertEquals(3, UTIL.getDFSCluster().getDataNodes().size());
|
||||
store.registerListener(new ProcedureStore.ProcedureStoreListener() {
|
||||
@Override
|
||||
public void postSync() {
|
||||
Threads.sleepWithoutInterrupt(2000);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void abortProcess() {}
|
||||
});
|
||||
|
||||
final AtomicInteger reCount = new AtomicInteger(0);
|
||||
Thread[] thread = new Thread[store.getNumThreads() * 2 + 1];
|
||||
for (int i = 0; i < thread.length; ++i) {
|
||||
final long procId = i + 1;
|
||||
thread[i] = new Thread() {
|
||||
public void run() {
|
||||
try {
|
||||
LOG.debug("[S] INSERT " + procId);
|
||||
store.insert(new TestProcedure(procId, -1), null);
|
||||
LOG.debug("[E] INSERT " + procId);
|
||||
} catch (RuntimeException e) {
|
||||
reCount.incrementAndGet();
|
||||
LOG.debug("[F] INSERT " + procId + ": " + e.getMessage());
|
||||
}
|
||||
}
|
||||
};
|
||||
thread[i].start();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void abortProcess() {}
|
||||
});
|
||||
Thread.sleep(1000);
|
||||
LOG.info("Stop DataNode");
|
||||
UTIL.getDFSCluster().stopDataNode(0);
|
||||
assertEquals(2, UTIL.getDFSCluster().getDataNodes().size());
|
||||
|
||||
final AtomicInteger reCount = new AtomicInteger(0);
|
||||
Thread[] thread = new Thread[store.getNumThreads() * 2 + 1];
|
||||
for (int i = 0; i < thread.length; ++i) {
|
||||
final long procId = i + 1;
|
||||
thread[i] = new Thread() {
|
||||
public void run() {
|
||||
try {
|
||||
LOG.debug("[S] INSERT " + procId);
|
||||
store.insert(new TestProcedure(procId, -1), null);
|
||||
LOG.debug("[E] INSERT " + procId);
|
||||
} catch (RuntimeException e) {
|
||||
reCount.incrementAndGet();
|
||||
LOG.debug("[F] INSERT " + procId + ": " + e.getMessage());
|
||||
}
|
||||
}
|
||||
};
|
||||
thread[i].start();
|
||||
for (int i = 0; i < thread.length; ++i) {
|
||||
thread[i].join();
|
||||
}
|
||||
|
||||
assertFalse(store.isRunning());
|
||||
assertTrue(reCount.toString(), reCount.get() >= store.getNumThreads() &&
|
||||
reCount.get() < thread.length);
|
||||
} finally {
|
||||
tearDown();
|
||||
}
|
||||
|
||||
Thread.sleep(1000);
|
||||
LOG.info("Stop DataNode");
|
||||
UTIL.getDFSCluster().stopDataNode(0);
|
||||
assertEquals(2, UTIL.getDFSCluster().getDataNodes().size());
|
||||
|
||||
for (int i = 0; i < thread.length; ++i) {
|
||||
thread[i].join();
|
||||
}
|
||||
|
||||
assertFalse(store.isRunning());
|
||||
assertTrue(reCount.toString(), reCount.get() >= store.getNumThreads() &&
|
||||
reCount.get() < thread.length);
|
||||
}
|
||||
|
||||
@Ignore ("Needs work") @Test(timeout=60000)
|
||||
@Test(timeout=60000)
|
||||
public void testWalRollOnLowReplication() throws Exception {
|
||||
store.unregisterListener(stopProcedureListener);
|
||||
store.registerListener(new ProcedureStore.ProcedureStoreListener() {
|
||||
@Override
|
||||
public void postSync() {}
|
||||
|
||||
@Override
|
||||
public void abortProcess() {
|
||||
LOG.info("Aborted!!!!");
|
||||
}
|
||||
});
|
||||
int dnCount = 0;
|
||||
store.insert(new TestProcedure(1, -1), null);
|
||||
UTIL.getDFSCluster().restartDataNode(dnCount);
|
||||
for (long i = 2; i < 100; ++i) {
|
||||
try {
|
||||
initConfig(UTIL.getConfiguration());
|
||||
UTIL.getConfiguration().setInt("dfs.namenode.replication.min", 1);
|
||||
setup();
|
||||
try {
|
||||
int dnCount = 0;
|
||||
store.insert(new TestProcedure(1, -1), null);
|
||||
UTIL.getDFSCluster().restartDataNode(dnCount);
|
||||
for (long i = 2; i < 100; ++i) {
|
||||
store.insert(new TestProcedure(i, -1), null);
|
||||
} catch (RuntimeException re) {
|
||||
String msg = re.getMessage();
|
||||
// We could get a sync failed here...if the test cluster is crawling such that DN recovery
|
||||
// is taking a long time. If we've done enough passes, just finish up the test as a 'pass'
|
||||
if (msg != null && msg.toLowerCase().contains("sync aborted")) {
|
||||
LOG.info("i=" + i, re);
|
||||
if (i > 50) {
|
||||
LOG.info("Returning early... i=" + i + "...We ran enough of this test", re);
|
||||
return;
|
||||
}
|
||||
waitForNumReplicas(3);
|
||||
Thread.sleep(100);
|
||||
if ((i % 30) == 0) {
|
||||
LOG.info("Restart Data Node");
|
||||
UTIL.getDFSCluster().restartDataNode(++dnCount % 3);
|
||||
}
|
||||
throw re;
|
||||
}
|
||||
waitForNumReplicas(3);
|
||||
Thread.sleep(100);
|
||||
if ((i % 30) == 0) {
|
||||
LOG.info("Restart Data Node");
|
||||
UTIL.getDFSCluster().restartDataNode(++dnCount % 3);
|
||||
}
|
||||
assertTrue(store.isRunning());
|
||||
} finally {
|
||||
tearDown();
|
||||
}
|
||||
assertTrue(store.isRunning());
|
||||
}
|
||||
|
||||
public void waitForNumReplicas(int numReplicas) throws Exception {
|
||||
|
|
Loading…
Reference in New Issue