From d76dbb4f84d8e606ab8d319aab7337ad030c85e8 Mon Sep 17 00:00:00 2001 From: stack Date: Sat, 24 Oct 2015 20:55:37 -0700 Subject: [PATCH] HBASE-14648 Reenable TestWALProcedureStoreOnHDFS#testWalRollOnLowReplication (Heng Chen) --- .../TestWALProcedureStoreOnHDFS.java | 169 +++++++++--------- 1 file changed, 80 insertions(+), 89 deletions(-) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestWALProcedureStoreOnHDFS.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestWALProcedureStoreOnHDFS.java index 67c4d2003eb..ecc75540718 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestWALProcedureStoreOnHDFS.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestWALProcedureStoreOnHDFS.java @@ -83,7 +83,7 @@ public class TestWALProcedureStoreOnHDFS { } }; - private static void setupConf(Configuration conf) { + private static void initConfig(Configuration conf) { conf.setInt("dfs.replication", 3); conf.setInt("dfs.namenode.replication.min", 3); @@ -93,20 +93,16 @@ public class TestWALProcedureStoreOnHDFS { conf.setInt("hbase.procedure.store.wal.sync.failure.roll.max", 10); } - @Before public void setup() throws Exception { - setupConf(UTIL.getConfiguration()); MiniDFSCluster dfs = UTIL.startMiniDFSCluster(3); Path logDir = new Path(new Path(dfs.getFileSystem().getUri()), "/test-logs"); - store = ProcedureTestingUtility.createWalStore( - UTIL.getConfiguration(), dfs.getFileSystem(), logDir); + store = ProcedureTestingUtility.createWalStore(UTIL.getConfiguration(), dfs.getFileSystem(), logDir); store.registerListener(stopProcedureListener); store.start(8); store.recoverLease(); } - @After public void tearDown() throws Exception { store.stop(false); UTIL.getDFSCluster().getFileSystem().delete(store.getLogDir(), true); @@ -120,107 +116,102 @@ public class TestWALProcedureStoreOnHDFS { @Test(timeout=60000, expected=RuntimeException.class) public void testWalAbortOnLowReplication() throws Exception { - assertEquals(3, UTIL.getDFSCluster().getDataNodes().size()); + initConfig(UTIL.getConfiguration()); + setup(); + try { + assertEquals(3, UTIL.getDFSCluster().getDataNodes().size()); - LOG.info("Stop DataNode"); - UTIL.getDFSCluster().stopDataNode(0); - assertEquals(2, UTIL.getDFSCluster().getDataNodes().size()); - - store.insert(new TestProcedure(1, -1), null); - for (long i = 2; store.isRunning(); ++i) { + LOG.info("Stop DataNode"); + UTIL.getDFSCluster().stopDataNode(0); assertEquals(2, UTIL.getDFSCluster().getDataNodes().size()); - store.insert(new TestProcedure(i, -1), null); - Thread.sleep(100); + + store.insert(new TestProcedure(1, -1), null); + for (long i = 2; store.isRunning(); ++i) { + assertEquals(2, UTIL.getDFSCluster().getDataNodes().size()); + store.insert(new TestProcedure(i, -1), null); + Thread.sleep(100); + } + assertFalse(store.isRunning()); + fail("The store.insert() should throw an exeption"); + } finally { + tearDown(); } - assertFalse(store.isRunning()); - fail("The store.insert() should throw an exeption"); } @Test(timeout=60000) public void testWalAbortOnLowReplicationWithQueuedWriters() throws Exception { - assertEquals(3, UTIL.getDFSCluster().getDataNodes().size()); - store.registerListener(new ProcedureStore.ProcedureStoreListener() { - @Override - public void postSync() { - Threads.sleepWithoutInterrupt(2000); + initConfig(UTIL.getConfiguration()); + setup(); + try { + assertEquals(3, UTIL.getDFSCluster().getDataNodes().size()); + store.registerListener(new ProcedureStore.ProcedureStoreListener() { + @Override + public void postSync() { + Threads.sleepWithoutInterrupt(2000); + } + + @Override + public void abortProcess() {} + }); + + final AtomicInteger reCount = new AtomicInteger(0); + Thread[] thread = new Thread[store.getNumThreads() * 2 + 1]; + for (int i = 0; i < thread.length; ++i) { + final long procId = i + 1; + thread[i] = new Thread() { + public void run() { + try { + LOG.debug("[S] INSERT " + procId); + store.insert(new TestProcedure(procId, -1), null); + LOG.debug("[E] INSERT " + procId); + } catch (RuntimeException e) { + reCount.incrementAndGet(); + LOG.debug("[F] INSERT " + procId + ": " + e.getMessage()); + } + } + }; + thread[i].start(); } - @Override - public void abortProcess() {} - }); + Thread.sleep(1000); + LOG.info("Stop DataNode"); + UTIL.getDFSCluster().stopDataNode(0); + assertEquals(2, UTIL.getDFSCluster().getDataNodes().size()); - final AtomicInteger reCount = new AtomicInteger(0); - Thread[] thread = new Thread[store.getNumThreads() * 2 + 1]; - for (int i = 0; i < thread.length; ++i) { - final long procId = i + 1; - thread[i] = new Thread() { - public void run() { - try { - LOG.debug("[S] INSERT " + procId); - store.insert(new TestProcedure(procId, -1), null); - LOG.debug("[E] INSERT " + procId); - } catch (RuntimeException e) { - reCount.incrementAndGet(); - LOG.debug("[F] INSERT " + procId + ": " + e.getMessage()); - } - } - }; - thread[i].start(); + for (int i = 0; i < thread.length; ++i) { + thread[i].join(); + } + + assertFalse(store.isRunning()); + assertTrue(reCount.toString(), reCount.get() >= store.getNumThreads() && + reCount.get() < thread.length); + } finally { + tearDown(); } - - Thread.sleep(1000); - LOG.info("Stop DataNode"); - UTIL.getDFSCluster().stopDataNode(0); - assertEquals(2, UTIL.getDFSCluster().getDataNodes().size()); - - for (int i = 0; i < thread.length; ++i) { - thread[i].join(); - } - - assertFalse(store.isRunning()); - assertTrue(reCount.toString(), reCount.get() >= store.getNumThreads() && - reCount.get() < thread.length); } - @Ignore ("Needs work") @Test(timeout=60000) + @Test(timeout=60000) public void testWalRollOnLowReplication() throws Exception { - store.unregisterListener(stopProcedureListener); - store.registerListener(new ProcedureStore.ProcedureStoreListener() { - @Override - public void postSync() {} - - @Override - public void abortProcess() { - LOG.info("Aborted!!!!"); - } - }); - int dnCount = 0; - store.insert(new TestProcedure(1, -1), null); - UTIL.getDFSCluster().restartDataNode(dnCount); - for (long i = 2; i < 100; ++i) { - try { + initConfig(UTIL.getConfiguration()); + UTIL.getConfiguration().setInt("dfs.namenode.replication.min", 1); + setup(); + try { + int dnCount = 0; + store.insert(new TestProcedure(1, -1), null); + UTIL.getDFSCluster().restartDataNode(dnCount); + for (long i = 2; i < 100; ++i) { store.insert(new TestProcedure(i, -1), null); - } catch (RuntimeException re) { - String msg = re.getMessage(); - // We could get a sync failed here...if the test cluster is crawling such that DN recovery - // is taking a long time. If we've done enough passes, just finish up the test as a 'pass' - if (msg != null && msg.toLowerCase().contains("sync aborted")) { - LOG.info("i=" + i, re); - if (i > 50) { - LOG.info("Returning early... i=" + i + "...We ran enough of this test", re); - return; - } + waitForNumReplicas(3); + Thread.sleep(100); + if ((i % 30) == 0) { + LOG.info("Restart Data Node"); + UTIL.getDFSCluster().restartDataNode(++dnCount % 3); } - throw re; - } - waitForNumReplicas(3); - Thread.sleep(100); - if ((i % 30) == 0) { - LOG.info("Restart Data Node"); - UTIL.getDFSCluster().restartDataNode(++dnCount % 3); } + assertTrue(store.isRunning()); + } finally { + tearDown(); } - assertTrue(store.isRunning()); } public void waitForNumReplicas(int numReplicas) throws Exception {