From 27d59d3ebb7466a601ae66db32ed9ff34dfd213a Mon Sep 17 00:00:00 2001 From: tedyu Date: Fri, 18 Mar 2016 20:53:29 -0700 Subject: [PATCH] HBASE-15302 Revert two tests which hung in build #31 --- .../TestMasterFailoverWithProcedures.java | 513 ------- .../apache/hadoop/hbase/wal/TestWALSplit.java | 1320 ----------------- 2 files changed, 1833 deletions(-) delete mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterFailoverWithProcedures.java delete mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterFailoverWithProcedures.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterFailoverWithProcedures.java deleted file mode 100644 index dbb2367e7ef..00000000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterFailoverWithProcedures.java +++ /dev/null @@ -1,513 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.master.procedure; - -import java.io.IOException; -import java.util.concurrent.CountDownLatch; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.MiniHBaseCluster; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.master.HMaster; -import org.apache.hadoop.hbase.procedure2.Procedure; -import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; -import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; -import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure; -import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; -import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore; -import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.CreateTableState; -import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.DeleteTableState; -import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.DisableTableState; -import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.EnableTableState; -import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.TruncateTableState; -import org.apache.hadoop.hbase.testclassification.LargeTests; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.FSUtils; -import org.apache.hadoop.hbase.util.ModifyRegionUtils; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.mockito.Mockito; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -@Category(LargeTests.class) -public class TestMasterFailoverWithProcedures { - private static final Log LOG = LogFactory.getLog(TestMasterFailoverWithProcedures.class); - - protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); - - private static void setupConf(Configuration conf) { - // don't waste time retrying with the roll, the test is already slow enough. - conf.setInt("hbase.procedure.store.wal.max.retries.before.roll", 1); - conf.setInt("hbase.procedure.store.wal.wait.before.roll", 0); - conf.setInt("hbase.procedure.store.wal.max.roll.retries", 1); - conf.setInt("hbase.procedure.store.wal.sync.failure.roll.max", 1); - } - - @Before - public void setup() throws Exception { - setupConf(UTIL.getConfiguration()); - UTIL.startMiniCluster(2, 1); - - final ProcedureExecutor procExec = getMasterProcedureExecutor(); - ProcedureTestingUtility.setToggleKillBeforeStoreUpdate(procExec, false); - ProcedureTestingUtility.setKillBeforeStoreUpdate(procExec, false); - } - - @After - public void tearDown() throws Exception { - try { - UTIL.shutdownMiniCluster(); - } catch (Exception e) { - LOG.warn("failure shutting down cluster", e); - } - } - - @Test(timeout=60000) - public void testWalRecoverLease() throws Exception { - final ProcedureStore masterStore = getMasterProcedureExecutor().getStore(); - assertTrue("expected WALStore for this test", masterStore instanceof WALProcedureStore); - - HMaster firstMaster = UTIL.getHBaseCluster().getMaster(); - // Abort Latch for the master store - final CountDownLatch masterStoreAbort = new CountDownLatch(1); - masterStore.registerListener(new ProcedureStore.ProcedureStoreListener() { - @Override - public void postSync() {} - - @Override - public void abortProcess() { - LOG.debug("Abort store of Master"); - masterStoreAbort.countDown(); - } - }); - - // startup a fake master the new WAL store will take the lease - // and the active master should abort. - HMaster backupMaster3 = Mockito.mock(HMaster.class); - Mockito.doReturn(firstMaster.getConfiguration()).when(backupMaster3).getConfiguration(); - Mockito.doReturn(true).when(backupMaster3).isActiveMaster(); - final WALProcedureStore backupStore3 = new WALProcedureStore(firstMaster.getConfiguration(), - firstMaster.getMasterFileSystem().getFileSystem(), - ((WALProcedureStore)masterStore).getLogDir(), - new MasterProcedureEnv.WALStoreLeaseRecovery(backupMaster3)); - // Abort Latch for the test store - final CountDownLatch backupStore3Abort = new CountDownLatch(1); - backupStore3.registerListener(new ProcedureStore.ProcedureStoreListener() { - @Override - public void postSync() {} - - @Override - public void abortProcess() { - LOG.debug("Abort store of backupMaster3"); - backupStore3Abort.countDown(); - backupStore3.stop(true); - } - }); - backupStore3.start(1); - backupStore3.recoverLease(); - - // Try to trigger a command on the master (WAL lease expired on the active one) - HTableDescriptor htd = MasterProcedureTestingUtility.createHTD(TableName.valueOf("mtb"), "f"); - HRegionInfo[] regions = ModifyRegionUtils.createHRegionInfos(htd, null); - LOG.debug("submit proc"); - try { - getMasterProcedureExecutor().submitProcedure( - new CreateTableProcedure(getMasterProcedureExecutor().getEnvironment(), htd, regions)); - fail("expected RuntimeException 'sync aborted'"); - } catch (RuntimeException e) { - LOG.info("got " + e.getMessage()); - } - LOG.debug("wait master store abort"); - masterStoreAbort.await(); - - // Now the real backup master should start up - LOG.debug("wait backup master to startup"); - waitBackupMaster(UTIL, firstMaster); - assertEquals(true, firstMaster.isStopped()); - - // wait the store in here to abort (the test will fail due to timeout if it doesn't) - LOG.debug("wait the store to abort"); - backupStore3.getStoreTracker().setDeleted(1, false); - try { - backupStore3.delete(1); - fail("expected RuntimeException 'sync aborted'"); - } catch (RuntimeException e) { - LOG.info("got " + e.getMessage()); - } - backupStore3Abort.await(); - } - - /** - * Tests proper fencing in case the current WAL store is fenced - */ - @Test - public void testWALfencingWithoutWALRolling() throws IOException { - testWALfencing(false); - } - - /** - * Tests proper fencing in case the current WAL store does not receive writes until after the - * new WAL does a couple of WAL rolls. - */ - @Test - public void testWALfencingWithWALRolling() throws IOException { - testWALfencing(true); - } - - public void testWALfencing(boolean walRolls) throws IOException { - final ProcedureStore procStore = getMasterProcedureExecutor().getStore(); - assertTrue("expected WALStore for this test", procStore instanceof WALProcedureStore); - - HMaster firstMaster = UTIL.getHBaseCluster().getMaster(); - - // cause WAL rolling after a delete in WAL: - firstMaster.getConfiguration().setLong("hbase.procedure.store.wal.roll.threshold", 1); - - HMaster backupMaster3 = Mockito.mock(HMaster.class); - Mockito.doReturn(firstMaster.getConfiguration()).when(backupMaster3).getConfiguration(); - Mockito.doReturn(true).when(backupMaster3).isActiveMaster(); - final WALProcedureStore procStore2 = new WALProcedureStore(firstMaster.getConfiguration(), - firstMaster.getMasterFileSystem().getFileSystem(), - ((WALProcedureStore)procStore).getLogDir(), - new MasterProcedureEnv.WALStoreLeaseRecovery(backupMaster3)); - - // start a second store which should fence the first one out - LOG.info("Starting new WALProcedureStore"); - procStore2.start(1); - procStore2.recoverLease(); - - // before writing back to the WAL store, optionally do a couple of WAL rolls (which causes - // to delete the old WAL files). - if (walRolls) { - LOG.info("Inserting into second WALProcedureStore, causing WAL rolls"); - for (int i = 0; i < 512; i++) { - // insert something to the second store then delete it, causing a WAL roll(s) - Procedure proc2 = new TestProcedure(i); - procStore2.insert(proc2, null); - procStore2.delete(proc2.getProcId()); // delete the procedure so that the WAL is removed later - } - } - - // Now, insert something to the first store, should fail. - // If the store does a WAL roll and continue with another logId without checking higher logIds - // it will incorrectly succeed. - LOG.info("Inserting into first WALProcedureStore"); - try { - procStore.insert(new TestProcedure(11), null); - fail("Inserting into Procedure Store should have failed"); - } catch (Exception ex) { - LOG.info("Received expected exception", ex); - } - } - - // ========================================================================== - // Test Create Table - // ========================================================================== - @Test(timeout=60000) - public void testCreateWithFailover() throws Exception { - // TODO: Should we try every step? (master failover takes long time) - // It is already covered by TestCreateTableProcedure - // but without the master restart, only the executor/store is restarted. - // Without Master restart we may not find bug in the procedure code - // like missing "wait" for resources to be available (e.g. RS) - testCreateWithFailoverAtStep(CreateTableState.CREATE_TABLE_ASSIGN_REGIONS.ordinal()); - } - - private void testCreateWithFailoverAtStep(final int step) throws Exception { - final TableName tableName = TableName.valueOf("testCreateWithFailoverAtStep" + step); - - // create the table - ProcedureExecutor procExec = getMasterProcedureExecutor(); - ProcedureTestingUtility.setKillBeforeStoreUpdate(procExec, true); - ProcedureTestingUtility.setToggleKillBeforeStoreUpdate(procExec, true); - - // Start the Create procedure && kill the executor - byte[][] splitKeys = null; - HTableDescriptor htd = MasterProcedureTestingUtility.createHTD(tableName, "f1", "f2"); - HRegionInfo[] regions = ModifyRegionUtils.createHRegionInfos(htd, splitKeys); - long procId = procExec.submitProcedure( - new CreateTableProcedure(procExec.getEnvironment(), htd, regions)); - testRecoveryAndDoubleExecution(UTIL, procId, step, CreateTableState.values()); - - MasterProcedureTestingUtility.validateTableCreation( - UTIL.getHBaseCluster().getMaster(), tableName, regions, "f1", "f2"); - } - - // ========================================================================== - // Test Delete Table - // ========================================================================== - @Test(timeout=60000) - public void testDeleteWithFailover() throws Exception { - // TODO: Should we try every step? (master failover takes long time) - // It is already covered by TestDeleteTableProcedure - // but without the master restart, only the executor/store is restarted. - // Without Master restart we may not find bug in the procedure code - // like missing "wait" for resources to be available (e.g. RS) - testDeleteWithFailoverAtStep(DeleteTableState.DELETE_TABLE_UNASSIGN_REGIONS.ordinal()); - } - - private void testDeleteWithFailoverAtStep(final int step) throws Exception { - final TableName tableName = TableName.valueOf("testDeleteWithFailoverAtStep" + step); - - // create the table - byte[][] splitKeys = null; - HRegionInfo[] regions = MasterProcedureTestingUtility.createTable( - getMasterProcedureExecutor(), tableName, splitKeys, "f1", "f2"); - Path tableDir = FSUtils.getTableDir(getRootDir(), tableName); - MasterProcedureTestingUtility.validateTableCreation( - UTIL.getHBaseCluster().getMaster(), tableName, regions, "f1", "f2"); - UTIL.getHBaseAdmin().disableTable(tableName); - - ProcedureExecutor procExec = getMasterProcedureExecutor(); - ProcedureTestingUtility.setKillBeforeStoreUpdate(procExec, true); - ProcedureTestingUtility.setToggleKillBeforeStoreUpdate(procExec, true); - - // Start the Delete procedure && kill the executor - long procId = procExec.submitProcedure( - new DeleteTableProcedure(procExec.getEnvironment(), tableName)); - testRecoveryAndDoubleExecution(UTIL, procId, step, DeleteTableState.values()); - - MasterProcedureTestingUtility.validateTableDeletion( - UTIL.getHBaseCluster().getMaster(), tableName, regions, "f1", "f2"); - } - - // ========================================================================== - // Test Truncate Table - // ========================================================================== - @Test(timeout=90000) - public void testTruncateWithFailover() throws Exception { - // TODO: Should we try every step? (master failover takes long time) - // It is already covered by TestTruncateTableProcedure - // but without the master restart, only the executor/store is restarted. - // Without Master restart we may not find bug in the procedure code - // like missing "wait" for resources to be available (e.g. RS) - testTruncateWithFailoverAtStep(true, TruncateTableState.TRUNCATE_TABLE_ADD_TO_META.ordinal()); - } - - private void testTruncateWithFailoverAtStep(final boolean preserveSplits, final int step) - throws Exception { - final TableName tableName = TableName.valueOf("testTruncateWithFailoverAtStep" + step); - - // create the table - final String[] families = new String[] { "f1", "f2" }; - final byte[][] splitKeys = new byte[][] { - Bytes.toBytes("a"), Bytes.toBytes("b"), Bytes.toBytes("c") - }; - HRegionInfo[] regions = MasterProcedureTestingUtility.createTable( - getMasterProcedureExecutor(), tableName, splitKeys, families); - // load and verify that there are rows in the table - MasterProcedureTestingUtility.loadData( - UTIL.getConnection(), tableName, 100, splitKeys, families); - assertEquals(100, UTIL.countRows(tableName)); - // disable the table - UTIL.getHBaseAdmin().disableTable(tableName); - - ProcedureExecutor procExec = getMasterProcedureExecutor(); - ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, true); - - // Start the Truncate procedure && kill the executor - long procId = procExec.submitProcedure( - new TruncateTableProcedure(procExec.getEnvironment(), tableName, preserveSplits)); - testRecoveryAndDoubleExecution(UTIL, procId, step, TruncateTableState.values()); - - ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, false); - UTIL.waitUntilAllRegionsAssigned(tableName); - - // validate the table regions and layout - if (preserveSplits) { - assertEquals(1 + splitKeys.length, UTIL.getHBaseAdmin().getTableRegions(tableName).size()); - } else { - regions = UTIL.getHBaseAdmin().getTableRegions(tableName).toArray(new HRegionInfo[1]); - assertEquals(1, regions.length); - } - MasterProcedureTestingUtility.validateTableCreation( - UTIL.getHBaseCluster().getMaster(), tableName, regions, families); - - // verify that there are no rows in the table - assertEquals(0, UTIL.countRows(tableName)); - - // verify that the table is read/writable - MasterProcedureTestingUtility.loadData( - UTIL.getConnection(), tableName, 50, splitKeys, families); - assertEquals(50, UTIL.countRows(tableName)); - } - - // ========================================================================== - // Test Disable Table - // ========================================================================== - @Test(timeout=60000) - public void testDisableTableWithFailover() throws Exception { - // TODO: Should we try every step? (master failover takes long time) - // It is already covered by TestDisableTableProcedure - // but without the master restart, only the executor/store is restarted. - // Without Master restart we may not find bug in the procedure code - // like missing "wait" for resources to be available (e.g. RS) - testDisableTableWithFailoverAtStep( - DisableTableState.DISABLE_TABLE_MARK_REGIONS_OFFLINE.ordinal()); - } - - private void testDisableTableWithFailoverAtStep(final int step) throws Exception { - final TableName tableName = TableName.valueOf("testDisableTableWithFailoverAtStep" + step); - - // create the table - final byte[][] splitKeys = new byte[][] { - Bytes.toBytes("a"), Bytes.toBytes("b"), Bytes.toBytes("c") - }; - MasterProcedureTestingUtility.createTable( - getMasterProcedureExecutor(), tableName, splitKeys, "f1", "f2"); - - ProcedureExecutor procExec = getMasterProcedureExecutor(); - ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, true); - - // Start the Delete procedure && kill the executor - long procId = procExec.submitProcedure( - new DisableTableProcedure(procExec.getEnvironment(), tableName, false)); - testRecoveryAndDoubleExecution(UTIL, procId, step, DisableTableState.values()); - - MasterProcedureTestingUtility.validateTableIsDisabled( - UTIL.getHBaseCluster().getMaster(), tableName); - } - - // ========================================================================== - // Test Enable Table - // ========================================================================== - @Test(timeout=60000) - public void testEnableTableWithFailover() throws Exception { - // TODO: Should we try every step? (master failover takes long time) - // It is already covered by TestEnableTableProcedure - // but without the master restart, only the executor/store is restarted. - // Without Master restart we may not find bug in the procedure code - // like missing "wait" for resources to be available (e.g. RS) - testEnableTableWithFailoverAtStep( - EnableTableState.ENABLE_TABLE_MARK_REGIONS_ONLINE.ordinal()); - } - - private void testEnableTableWithFailoverAtStep(final int step) throws Exception { - final TableName tableName = TableName.valueOf("testEnableTableWithFailoverAtStep" + step); - - // create the table - final byte[][] splitKeys = new byte[][] { - Bytes.toBytes("a"), Bytes.toBytes("b"), Bytes.toBytes("c") - }; - MasterProcedureTestingUtility.createTable( - getMasterProcedureExecutor(), tableName, splitKeys, "f1", "f2"); - UTIL.getHBaseAdmin().disableTable(tableName); - - ProcedureExecutor procExec = getMasterProcedureExecutor(); - ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, true); - - // Start the Delete procedure && kill the executor - long procId = procExec.submitProcedure( - new EnableTableProcedure(procExec.getEnvironment(), tableName, false)); - testRecoveryAndDoubleExecution(UTIL, procId, step, EnableTableState.values()); - - MasterProcedureTestingUtility.validateTableIsEnabled( - UTIL.getHBaseCluster().getMaster(), tableName); - } - - // ========================================================================== - // Test Helpers - // ========================================================================== - public static void testRecoveryAndDoubleExecution(final HBaseTestingUtility testUtil, - final long procId, final int lastStepBeforeFailover, TState[] states) throws Exception { - ProcedureExecutor procExec = - testUtil.getHBaseCluster().getMaster().getMasterProcedureExecutor(); - ProcedureTestingUtility.waitProcedure(procExec, procId); - - for (int i = 0; i < lastStepBeforeFailover; ++i) { - LOG.info("Restart "+ i +" exec state: " + states[i]); - ProcedureTestingUtility.assertProcNotYetCompleted(procExec, procId); - ProcedureTestingUtility.restart(procExec); - ProcedureTestingUtility.waitProcedure(procExec, procId); - } - ProcedureTestingUtility.assertProcNotYetCompleted(procExec, procId); - - LOG.info("Trigger master failover"); - masterFailover(testUtil); - - procExec = testUtil.getHBaseCluster().getMaster().getMasterProcedureExecutor(); - ProcedureTestingUtility.waitProcedure(procExec, procId); - ProcedureTestingUtility.assertProcNotFailed(procExec, procId); - } - - // ========================================================================== - // Master failover utils - // ========================================================================== - public static void masterFailover(final HBaseTestingUtility testUtil) - throws Exception { - MiniHBaseCluster cluster = testUtil.getMiniHBaseCluster(); - - // Kill the master - HMaster oldMaster = cluster.getMaster(); - cluster.killMaster(cluster.getMaster().getServerName()); - - // Wait the secondary - waitBackupMaster(testUtil, oldMaster); - } - - public static void waitBackupMaster(final HBaseTestingUtility testUtil, - final HMaster oldMaster) throws Exception { - MiniHBaseCluster cluster = testUtil.getMiniHBaseCluster(); - - HMaster newMaster = cluster.getMaster(); - while (newMaster == null || newMaster == oldMaster) { - Thread.sleep(250); - newMaster = cluster.getMaster(); - } - - while (!(newMaster.isActiveMaster() && newMaster.isInitialized())) { - Thread.sleep(250); - } - } - - // ========================================================================== - // Helpers - // ========================================================================== - private MasterProcedureEnv getMasterProcedureEnv() { - return getMasterProcedureExecutor().getEnvironment(); - } - - private ProcedureExecutor getMasterProcedureExecutor() { - return UTIL.getHBaseCluster().getMaster().getMasterProcedureExecutor(); - } - - private FileSystem getFileSystem() { - return UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getFileSystem(); - } - - private Path getRootDir() { - return UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getRootDir(); - } - - private Path getTempDir() { - return UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getTempDir(); - } -} \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java deleted file mode 100644 index 67fc60a59a8..00000000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java +++ /dev/null @@ -1,1320 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.wal; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import java.io.FileNotFoundException; -import java.io.IOException; -import java.lang.reflect.Method; -import java.security.PrivilegedExceptionAction; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.NavigableSet; -import java.util.Set; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.FileUtil; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.PathFilter; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode; -import org.apache.hadoop.hbase.regionserver.HRegion; -import org.apache.hadoop.hbase.regionserver.wal.FaultySequenceFileLogReader; -import org.apache.hadoop.hbase.regionserver.wal.InstrumentedLogWriter; -import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader; -import org.apache.hadoop.hbase.regionserver.wal.WALEdit; -import org.apache.hadoop.hbase.security.User; -import org.apache.hadoop.hbase.testclassification.LargeTests; -import org.apache.hadoop.hbase.testclassification.RegionServerTests; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.CancelableProgressable; -import org.apache.hadoop.hbase.util.FSUtils; -import org.apache.hadoop.hbase.util.Threads; -import org.apache.hadoop.hbase.wal.WAL.Entry; -import org.apache.hadoop.hbase.wal.WAL.Reader; -import org.apache.hadoop.hbase.wal.WALProvider.Writer; -import org.apache.hadoop.hbase.wal.WALSplitter.CorruptedLogFileException; -import org.apache.hadoop.hdfs.DFSTestUtil; -import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException; -import org.apache.hadoop.ipc.RemoteException; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.rules.TestName; -import org.mockito.Mockito; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; - -import com.google.common.base.Joiner; -import com.google.common.collect.ImmutableList; - -/** - * Testing {@link WAL} splitting code. - */ -@Category({RegionServerTests.class, LargeTests.class}) -public class TestWALSplit { - { - // Uncomment the following lines if more verbosity is needed for - // debugging (see HBASE-12285 for details). - //((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL); - //((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL); - //((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL); - } - private final static Log LOG = LogFactory.getLog(TestWALSplit.class); - - private static Configuration conf; - private FileSystem fs; - - protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - - private Path HBASEDIR; - private Path WALDIR; - private Path OLDLOGDIR; - private Path CORRUPTDIR; - private Path TABLEDIR; - - private static final int NUM_WRITERS = 10; - private static final int ENTRIES = 10; // entries per writer per region - - private static final String FILENAME_BEING_SPLIT = "testfile"; - private static final TableName TABLE_NAME = - TableName.valueOf("t1"); - private static final byte[] FAMILY = "f1".getBytes(); - private static final byte[] QUALIFIER = "q1".getBytes(); - private static final byte[] VALUE = "v1".getBytes(); - private static final String WAL_FILE_PREFIX = "wal.dat."; - private static List REGIONS = new ArrayList(); - private static final String HBASE_SKIP_ERRORS = "hbase.hlog.split.skip.errors"; - private static String ROBBER; - private static String ZOMBIE; - private static String [] GROUP = new String [] {"supergroup"}; - private RecoveryMode mode; - - static enum Corruptions { - INSERT_GARBAGE_ON_FIRST_LINE, - INSERT_GARBAGE_IN_THE_MIDDLE, - APPEND_GARBAGE, - TRUNCATE, - TRUNCATE_TRAILER - } - - @BeforeClass - public static void setUpBeforeClass() throws Exception { - conf = TEST_UTIL.getConfiguration(); - conf.setClass("hbase.regionserver.hlog.writer.impl", - InstrumentedLogWriter.class, Writer.class); - conf.setBoolean("dfs.support.broken.append", true); - conf.setBoolean("dfs.support.append", true); - // This is how you turn off shortcircuit read currently. TODO: Fix. Should read config. - System.setProperty("hbase.tests.use.shortcircuit.reads", "false"); - // Create fake maping user to group and set it to the conf. - Map u2g_map = new HashMap(2); - ROBBER = User.getCurrent().getName() + "-robber"; - ZOMBIE = User.getCurrent().getName() + "-zombie"; - u2g_map.put(ROBBER, GROUP); - u2g_map.put(ZOMBIE, GROUP); - DFSTestUtil.updateConfWithFakeGroupMapping(conf, u2g_map); - conf.setInt("dfs.heartbeat.interval", 1); - TEST_UTIL.startMiniDFSCluster(2); - } - - @AfterClass - public static void tearDownAfterClass() throws Exception { - TEST_UTIL.shutdownMiniDFSCluster(); - } - - @Rule - public TestName name = new TestName(); - private WALFactory wals = null; - - @Before - public void setUp() throws Exception { - LOG.info("Cleaning up cluster for new test."); - fs = TEST_UTIL.getDFSCluster().getFileSystem(); - HBASEDIR = TEST_UTIL.createRootDir(); - OLDLOGDIR = new Path(HBASEDIR, HConstants.HREGION_OLDLOGDIR_NAME); - CORRUPTDIR = new Path(HBASEDIR, HConstants.CORRUPT_DIR_NAME); - TABLEDIR = FSUtils.getTableDir(HBASEDIR, TABLE_NAME); - REGIONS.clear(); - Collections.addAll(REGIONS, "bbb", "ccc"); - InstrumentedLogWriter.activateFailure = false; - this.mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ? - RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING); - wals = new WALFactory(conf, null, name.getMethodName()); - WALDIR = new Path(HBASEDIR, DefaultWALProvider.getWALDirectoryName(name.getMethodName())); - //fs.mkdirs(WALDIR); - } - - @After - public void tearDown() throws Exception { - try { - wals.close(); - } catch(IOException exception) { - // Some tests will move WALs out from under us. In those cases, we'll get an error on close. - LOG.info("Ignoring an error while closing down our WALFactory. Fine for some tests, but if" + - " you see a failure look here."); - LOG.debug("exception details", exception); - } finally { - wals = null; - fs.delete(HBASEDIR, true); - } - } - - /** - * Simulates splitting a WAL out from under a regionserver that is still trying to write it. - * Ensures we do not lose edits. - * @throws IOException - * @throws InterruptedException - */ - @Test (timeout=300000) - public void testLogCannotBeWrittenOnceParsed() throws IOException, InterruptedException { - final AtomicLong counter = new AtomicLong(0); - AtomicBoolean stop = new AtomicBoolean(false); - // Region we'll write edits too and then later examine to make sure they all made it in. - final String region = REGIONS.get(0); - final int numWriters = 3; - Thread zombie = new ZombieLastLogWriterRegionServer(counter, stop, region, numWriters); - try { - long startCount = counter.get(); - zombie.start(); - // Wait till writer starts going. - while (startCount == counter.get()) Threads.sleep(1); - // Give it a second to write a few appends. - Threads.sleep(1000); - final Configuration conf2 = HBaseConfiguration.create(this.conf); - final User robber = User.createUserForTesting(conf2, ROBBER, GROUP); - int count = robber.runAs(new PrivilegedExceptionAction() { - @Override - public Integer run() throws Exception { - StringBuilder ls = new StringBuilder("Contents of WALDIR (").append(WALDIR) - .append("):\n"); - for (FileStatus status : fs.listStatus(WALDIR)) { - ls.append("\t").append(status.toString()).append("\n"); - } - LOG.debug(ls); - LOG.info("Splitting WALs out from under zombie. Expecting " + numWriters + " files."); - WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf2, wals); - LOG.info("Finished splitting out from under zombie."); - Path[] logfiles = getLogForRegion(HBASEDIR, TABLE_NAME, region); - assertEquals("wrong number of split files for region", numWriters, logfiles.length); - int count = 0; - for (Path logfile: logfiles) { - count += countWAL(logfile); - } - return count; - } - }); - LOG.info("zombie=" + counter.get() + ", robber=" + count); - assertTrue("The log file could have at most 1 extra log entry, but can't have less. " + - "Zombie could write " + counter.get() + " and logfile had only " + count, - counter.get() == count || counter.get() + 1 == count); - } finally { - stop.set(true); - zombie.interrupt(); - Threads.threadDumpingIsAlive(zombie); - } - } - - /** - * This thread will keep writing to a 'wal' file even after the split process has started. - * It simulates a region server that was considered dead but woke up and wrote some more to the - * last log entry. Does its writing as an alternate user in another filesystem instance to - * simulate better it being a regionserver. - */ - class ZombieLastLogWriterRegionServer extends Thread { - final AtomicLong editsCount; - final AtomicBoolean stop; - final int numOfWriters; - /** - * Region to write edits for. - */ - final String region; - final User user; - - public ZombieLastLogWriterRegionServer(AtomicLong counter, AtomicBoolean stop, - final String region, final int writers) - throws IOException, InterruptedException { - super("ZombieLastLogWriterRegionServer"); - setDaemon(true); - this.stop = stop; - this.editsCount = counter; - this.region = region; - this.user = User.createUserForTesting(conf, ZOMBIE, GROUP); - numOfWriters = writers; - } - - @Override - public void run() { - try { - doWriting(); - } catch (IOException e) { - LOG.warn(getName() + " Writer exiting " + e); - } catch (InterruptedException e) { - LOG.warn(getName() + " Writer exiting " + e); - } - } - - private void doWriting() throws IOException, InterruptedException { - this.user.runAs(new PrivilegedExceptionAction() { - @Override - public Object run() throws Exception { - // Index of the WAL we want to keep open. generateWALs will leave open the WAL whose - // index we supply here. - int walToKeepOpen = numOfWriters - 1; - // The below method writes numOfWriters files each with ENTRIES entries for a total of - // numOfWriters * ENTRIES added per column family in the region. - Writer writer = null; - try { - writer = generateWALs(numOfWriters, ENTRIES, walToKeepOpen); - } catch (IOException e1) { - throw new RuntimeException("Failed", e1); - } - // Update counter so has all edits written so far. - editsCount.addAndGet(numOfWriters * ENTRIES); - loop(writer); - // If we've been interruped, then things should have shifted out from under us. - // closing should error - try { - writer.close(); - fail("Writing closing after parsing should give an error."); - } catch (IOException exception) { - LOG.debug("ignoring error when closing final writer.", exception); - } - return null; - } - }); - } - - private void loop(final Writer writer) { - byte [] regionBytes = Bytes.toBytes(this.region); - while (!stop.get()) { - try { - long seq = appendEntry(writer, TABLE_NAME, regionBytes, - ("r" + editsCount.get()).getBytes(), regionBytes, QUALIFIER, VALUE, 0); - long count = editsCount.incrementAndGet(); - LOG.info(getName() + " sync count=" + count + ", seq=" + seq); - try { - Thread.sleep(1); - } catch (InterruptedException e) { - // - } - } catch (IOException ex) { - LOG.error(getName() + " ex " + ex.toString()); - if (ex instanceof RemoteException) { - LOG.error("Juliet: got RemoteException " + ex.getMessage() + - " while writing " + (editsCount.get() + 1)); - } else { - LOG.error(getName() + " failed to write....at " + editsCount.get()); - fail("Failed to write " + editsCount.get()); - } - break; - } catch (Throwable t) { - LOG.error(getName() + " HOW? " + t); - LOG.debug("exception details", t); - break; - } - } - LOG.info(getName() + " Writer exiting"); - } - } - - /** - * @throws IOException - * @see https://issues.apache.org/jira/browse/HBASE-3020 - */ - @Test (timeout=300000) - public void testRecoveredEditsPathForMeta() throws IOException { - byte [] encoded = HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes(); - Path tdir = FSUtils.getTableDir(HBASEDIR, TableName.META_TABLE_NAME); - Path regiondir = new Path(tdir, - HRegionInfo.FIRST_META_REGIONINFO.getEncodedName()); - fs.mkdirs(regiondir); - long now = System.currentTimeMillis(); - Entry entry = - new Entry(new WALKey(encoded, - TableName.META_TABLE_NAME, 1, now, HConstants.DEFAULT_CLUSTER_ID), - new WALEdit()); - Path p = WALSplitter.getRegionSplitEditsPath(fs, entry, HBASEDIR, - FILENAME_BEING_SPLIT); - String parentOfParent = p.getParent().getParent().getName(); - assertEquals(parentOfParent, HRegionInfo.FIRST_META_REGIONINFO.getEncodedName()); - } - - /** - * Test old recovered edits file doesn't break WALSplitter. - * This is useful in upgrading old instances. - */ - @Test (timeout=300000) - public void testOldRecoveredEditsFileSidelined() throws IOException { - byte [] encoded = HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes(); - Path tdir = FSUtils.getTableDir(HBASEDIR, TableName.META_TABLE_NAME); - Path regiondir = new Path(tdir, - HRegionInfo.FIRST_META_REGIONINFO.getEncodedName()); - fs.mkdirs(regiondir); - long now = System.currentTimeMillis(); - Entry entry = - new Entry(new WALKey(encoded, - TableName.META_TABLE_NAME, 1, now, HConstants.DEFAULT_CLUSTER_ID), - new WALEdit()); - Path parent = WALSplitter.getRegionDirRecoveredEditsDir(regiondir); - assertEquals(parent.getName(), HConstants.RECOVERED_EDITS_DIR); - fs.createNewFile(parent); // create a recovered.edits file - - Path p = WALSplitter.getRegionSplitEditsPath(fs, entry, HBASEDIR, - FILENAME_BEING_SPLIT); - String parentOfParent = p.getParent().getParent().getName(); - assertEquals(parentOfParent, HRegionInfo.FIRST_META_REGIONINFO.getEncodedName()); - WALFactory.createRecoveredEditsWriter(fs, p, conf).close(); - } - - private void useDifferentDFSClient() throws IOException { - // make fs act as a different client now - // initialize will create a new DFSClient with a new client ID - fs.initialize(fs.getUri(), conf); - } - - @Test (timeout=300000) - public void testSplitPreservesEdits() throws IOException{ - final String REGION = "region__1"; - REGIONS.clear(); - REGIONS.add(REGION); - - generateWALs(1, 10, -1); - useDifferentDFSClient(); - WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals); - Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath(); - Path[] splitLog = getLogForRegion(HBASEDIR, TABLE_NAME, REGION); - assertEquals(1, splitLog.length); - - assertTrue("edits differ after split", logsAreEqual(originalLog, splitLog[0])); - } - - /** - * @param expectedEntries -1 to not assert - * @return the count across all regions - */ - private int splitAndCount(final int expectedFiles, final int expectedEntries) - throws IOException { - useDifferentDFSClient(); - WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals); - int result = 0; - for (String region : REGIONS) { - Path[] logfiles = getLogForRegion(HBASEDIR, TABLE_NAME, region); - assertEquals(expectedFiles, logfiles.length); - int count = 0; - for (Path logfile: logfiles) { - count += countWAL(logfile); - } - if (-1 != expectedEntries) { - assertEquals(expectedEntries, count); - } - result += count; - } - return result; - } - - @Test (timeout=300000) - public void testEmptyLogFiles() throws IOException { - testEmptyLogFiles(true); - } - - @Test (timeout=300000) - public void testEmptyOpenLogFiles() throws IOException { - testEmptyLogFiles(false); - } - - private void testEmptyLogFiles(final boolean close) throws IOException { - // we won't create the hlog dir until getWAL got called, so - // make dir here when testing empty log file - fs.mkdirs(WALDIR); - injectEmptyFile(".empty", close); - generateWALs(Integer.MAX_VALUE); - injectEmptyFile("empty", close); - splitAndCount(NUM_WRITERS, NUM_WRITERS * ENTRIES); // skip 2 empty - } - - @Test (timeout=300000) - public void testOpenZeroLengthReportedFileButWithDataGetsSplit() throws IOException { - // generate logs but leave wal.dat.5 open. - generateWALs(5); - splitAndCount(NUM_WRITERS, NUM_WRITERS * ENTRIES); - } - - @Test (timeout=300000) - public void testTralingGarbageCorruptionFileSkipErrorsPasses() throws IOException { - conf.setBoolean(HBASE_SKIP_ERRORS, true); - generateWALs(Integer.MAX_VALUE); - corruptWAL(new Path(WALDIR, WAL_FILE_PREFIX + "5"), - Corruptions.APPEND_GARBAGE, true); - splitAndCount(NUM_WRITERS, NUM_WRITERS * ENTRIES); - } - - @Test (timeout=300000) - public void testFirstLineCorruptionLogFileSkipErrorsPasses() throws IOException { - conf.setBoolean(HBASE_SKIP_ERRORS, true); - generateWALs(Integer.MAX_VALUE); - corruptWAL(new Path(WALDIR, WAL_FILE_PREFIX + "5"), - Corruptions.INSERT_GARBAGE_ON_FIRST_LINE, true); - splitAndCount(NUM_WRITERS - 1, (NUM_WRITERS - 1) * ENTRIES); //1 corrupt - } - - @Test (timeout=300000) - public void testMiddleGarbageCorruptionSkipErrorsReadsHalfOfFile() throws IOException { - conf.setBoolean(HBASE_SKIP_ERRORS, true); - generateWALs(Integer.MAX_VALUE); - corruptWAL(new Path(WALDIR, WAL_FILE_PREFIX + "5"), - Corruptions.INSERT_GARBAGE_IN_THE_MIDDLE, false); - // the entries in the original logs are alternating regions - // considering the sequence file header, the middle corruption should - // affect at least half of the entries - int goodEntries = (NUM_WRITERS - 1) * ENTRIES; - int firstHalfEntries = (int) Math.ceil(ENTRIES / 2) - 1; - int allRegionsCount = splitAndCount(NUM_WRITERS, -1); - assertTrue("The file up to the corrupted area hasn't been parsed", - REGIONS.size() * (goodEntries + firstHalfEntries) <= allRegionsCount); - } - - @Test (timeout=300000) - public void testCorruptedFileGetsArchivedIfSkipErrors() throws IOException { - conf.setBoolean(HBASE_SKIP_ERRORS, true); - for (FaultySequenceFileLogReader.FailureType failureType : - FaultySequenceFileLogReader.FailureType.values()) { - final Set walDirContents = splitCorruptWALs(failureType); - final Set archivedLogs = new HashSet(); - final StringBuilder archived = new StringBuilder("Archived logs in CORRUPTDIR:"); - for (FileStatus log : fs.listStatus(CORRUPTDIR)) { - archived.append("\n\t").append(log.toString()); - archivedLogs.add(log.getPath().getName()); - } - LOG.debug(archived.toString()); - assertEquals(failureType.name() + ": expected to find all of our wals corrupt.", - walDirContents, archivedLogs); - } - } - - /** - * @return set of wal names present prior to split attempt. - * @throws IOException if the split process fails - */ - private Set splitCorruptWALs(final FaultySequenceFileLogReader.FailureType failureType) - throws IOException { - Class backupClass = conf.getClass("hbase.regionserver.hlog.reader.impl", - Reader.class); - InstrumentedLogWriter.activateFailure = false; - - try { - conf.setClass("hbase.regionserver.hlog.reader.impl", - FaultySequenceFileLogReader.class, Reader.class); - conf.set("faultysequencefilelogreader.failuretype", failureType.name()); - // Clean up from previous tests or previous loop - try { - wals.shutdown(); - } catch (IOException exception) { - // since we're splitting out from under the factory, we should expect some closing failures. - LOG.debug("Ignoring problem closing WALFactory.", exception); - } - wals.close(); - try { - for (FileStatus log : fs.listStatus(CORRUPTDIR)) { - fs.delete(log.getPath(), true); - } - } catch (FileNotFoundException exception) { - LOG.debug("no previous CORRUPTDIR to clean."); - } - // change to the faulty reader - wals = new WALFactory(conf, null, name.getMethodName()); - generateWALs(-1); - // Our reader will render all of these files corrupt. - final Set walDirContents = new HashSet(); - for (FileStatus status : fs.listStatus(WALDIR)) { - walDirContents.add(status.getPath().getName()); - } - useDifferentDFSClient(); - WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals); - return walDirContents; - } finally { - conf.setClass("hbase.regionserver.hlog.reader.impl", backupClass, - Reader.class); - } - } - - @Test (timeout=300000, expected = IOException.class) - public void testTrailingGarbageCorruptionLogFileSkipErrorsFalseThrows() - throws IOException { - conf.setBoolean(HBASE_SKIP_ERRORS, false); - splitCorruptWALs(FaultySequenceFileLogReader.FailureType.BEGINNING); - } - - @Test (timeout=300000) - public void testCorruptedLogFilesSkipErrorsFalseDoesNotTouchLogs() - throws IOException { - conf.setBoolean(HBASE_SKIP_ERRORS, false); - try { - splitCorruptWALs(FaultySequenceFileLogReader.FailureType.BEGINNING); - } catch (IOException e) { - LOG.debug("split with 'skip errors' set to 'false' correctly threw"); - } - assertEquals("if skip.errors is false all files should remain in place", - NUM_WRITERS, fs.listStatus(WALDIR).length); - } - - private void ignoreCorruption(final Corruptions corruption, final int entryCount, - final int expectedCount) throws IOException { - conf.setBoolean(HBASE_SKIP_ERRORS, false); - - final String REGION = "region__1"; - REGIONS.clear(); - REGIONS.add(REGION); - - Path c1 = new Path(WALDIR, WAL_FILE_PREFIX + "0"); - generateWALs(1, entryCount, -1); - corruptWAL(c1, corruption, true); - - useDifferentDFSClient(); - WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals); - - Path[] splitLog = getLogForRegion(HBASEDIR, TABLE_NAME, REGION); - assertEquals(1, splitLog.length); - - int actualCount = 0; - Reader in = wals.createReader(fs, splitLog[0]); - @SuppressWarnings("unused") - Entry entry; - while ((entry = in.next()) != null) ++actualCount; - assertEquals(expectedCount, actualCount); - in.close(); - - // should not have stored the EOF files as corrupt - FileStatus[] archivedLogs = fs.listStatus(CORRUPTDIR); - assertEquals(archivedLogs.length, 0); - - } - - @Test (timeout=300000) - public void testEOFisIgnored() throws IOException { - int entryCount = 10; - ignoreCorruption(Corruptions.TRUNCATE, entryCount, entryCount-1); - } - - @Test (timeout=300000) - public void testCorruptWALTrailer() throws IOException { - int entryCount = 10; - ignoreCorruption(Corruptions.TRUNCATE_TRAILER, entryCount, entryCount); - } - - @Test (timeout=300000) - public void testLogsGetArchivedAfterSplit() throws IOException { - conf.setBoolean(HBASE_SKIP_ERRORS, false); - generateWALs(-1); - useDifferentDFSClient(); - WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals); - FileStatus[] archivedLogs = fs.listStatus(OLDLOGDIR); - assertEquals("wrong number of files in the archive log", NUM_WRITERS, archivedLogs.length); - } - - @Test (timeout=300000) - public void testSplit() throws IOException { - generateWALs(-1); - splitAndCount(NUM_WRITERS, NUM_WRITERS * ENTRIES); - } - - @Test (timeout=300000) - public void testLogDirectoryShouldBeDeletedAfterSuccessfulSplit() - throws IOException { - generateWALs(-1); - useDifferentDFSClient(); - WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals); - FileStatus [] statuses = null; - try { - statuses = fs.listStatus(WALDIR); - if (statuses != null) { - fail("Files left in log dir: " + - Joiner.on(",").join(FileUtil.stat2Paths(statuses))); - } - } catch (FileNotFoundException e) { - // hadoop 0.21 throws FNFE whereas hadoop 0.20 returns null - } - } - - @Test(timeout=300000, expected = IOException.class) - public void testSplitWillFailIfWritingToRegionFails() throws Exception { - //leave 5th log open so we could append the "trap" - Writer writer = generateWALs(4); - useDifferentDFSClient(); - - String region = "break"; - Path regiondir = new Path(TABLEDIR, region); - fs.mkdirs(regiondir); - - InstrumentedLogWriter.activateFailure = false; - appendEntry(writer, TABLE_NAME, Bytes.toBytes(region), - ("r" + 999).getBytes(), FAMILY, QUALIFIER, VALUE, 0); - writer.close(); - - try { - InstrumentedLogWriter.activateFailure = true; - WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals); - } catch (IOException e) { - assertTrue(e.getMessage(). - contains("This exception is instrumented and should only be thrown for testing")); - throw e; - } finally { - InstrumentedLogWriter.activateFailure = false; - } - } - - @Test (timeout=300000) - public void testSplitDeletedRegion() throws IOException { - REGIONS.clear(); - String region = "region_that_splits"; - REGIONS.add(region); - - generateWALs(1); - useDifferentDFSClient(); - - Path regiondir = new Path(TABLEDIR, region); - fs.delete(regiondir, true); - WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals); - assertFalse(fs.exists(regiondir)); - } - - @Test (timeout=300000) - public void testIOEOnOutputThread() throws Exception { - conf.setBoolean(HBASE_SKIP_ERRORS, false); - - generateWALs(-1); - useDifferentDFSClient(); - FileStatus[] logfiles = fs.listStatus(WALDIR); - assertTrue("There should be some log file", - logfiles != null && logfiles.length > 0); - // wals with no entries (like the one we don't use in the factory) - // won't cause a failure since nothing will ever be written. - // pick the largest one since it's most likely to have entries. - int largestLogFile = 0; - long largestSize = 0; - for (int i = 0; i < logfiles.length; i++) { - if (logfiles[i].getLen() > largestSize) { - largestLogFile = i; - largestSize = logfiles[i].getLen(); - } - } - assertTrue("There should be some log greater than size 0.", 0 < largestSize); - // Set up a splitter that will throw an IOE on the output side - WALSplitter logSplitter = new WALSplitter(wals, - conf, HBASEDIR, fs, null, null, this.mode) { - @Override - protected Writer createWriter(Path logfile) throws IOException { - Writer mockWriter = Mockito.mock(Writer.class); - Mockito.doThrow(new IOException("Injected")).when( - mockWriter).append(Mockito.any()); - return mockWriter; - } - }; - // Set up a background thread dumper. Needs a thread to depend on and then we need to run - // the thread dumping in a background thread so it does not hold up the test. - final AtomicBoolean stop = new AtomicBoolean(false); - final Thread someOldThread = new Thread("Some-old-thread") { - @Override - public void run() { - while(!stop.get()) Threads.sleep(10); - } - }; - someOldThread.setDaemon(true); - someOldThread.start(); - final Thread t = new Thread("Background-thread-dumper") { - public void run() { - try { - Threads.threadDumpingIsAlive(someOldThread); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - }; - t.setDaemon(true); - t.start(); - try { - logSplitter.splitLogFile(logfiles[largestLogFile], null); - fail("Didn't throw!"); - } catch (IOException ioe) { - assertTrue(ioe.toString().contains("Injected")); - } finally { - // Setting this to true will turn off the background thread dumper. - stop.set(true); - } - } - - /** - * @param spiedFs should be instrumented for failure. - */ - private void retryOverHdfsProblem(final FileSystem spiedFs) throws Exception { - generateWALs(-1); - useDifferentDFSClient(); - - try { - WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, spiedFs, conf, wals); - assertEquals(NUM_WRITERS, fs.listStatus(OLDLOGDIR).length); - assertFalse(fs.exists(WALDIR)); - } catch (IOException e) { - fail("There shouldn't be any exception but: " + e.toString()); - } - } - - // Test for HBASE-3412 - @Test (timeout=300000) - public void testMovedWALDuringRecovery() throws Exception { - // This partial mock will throw LEE for every file simulating - // files that were moved - FileSystem spiedFs = Mockito.spy(fs); - // The "File does not exist" part is very important, - // that's how it comes out of HDFS - Mockito.doThrow(new LeaseExpiredException("Injected: File does not exist")). - when(spiedFs).append(Mockito.any()); - retryOverHdfsProblem(spiedFs); - } - - @Test (timeout=300000) - public void testRetryOpenDuringRecovery() throws Exception { - FileSystem spiedFs = Mockito.spy(fs); - // The "Cannot obtain block length", "Could not obtain the last block", - // and "Blocklist for [^ ]* has changed.*" part is very important, - // that's how it comes out of HDFS. If HDFS changes the exception - // message, this test needs to be adjusted accordingly. - // - // When DFSClient tries to open a file, HDFS needs to locate - // the last block of the file and get its length. However, if the - // last block is under recovery, HDFS may have problem to obtain - // the block length, in which case, retry may help. - Mockito.doAnswer(new Answer() { - private final String[] errors = new String[] { - "Cannot obtain block length", "Could not obtain the last block", - "Blocklist for " + OLDLOGDIR + " has changed"}; - private int count = 0; - - public FSDataInputStream answer(InvocationOnMock invocation) throws Throwable { - if (count < 3) { - throw new IOException(errors[count++]); - } - return (FSDataInputStream)invocation.callRealMethod(); - } - }).when(spiedFs).open(Mockito.any(), Mockito.anyInt()); - retryOverHdfsProblem(spiedFs); - } - - @Test (timeout=300000) - public void testTerminationAskedByReporter() throws IOException, CorruptedLogFileException { - generateWALs(1, 10, -1); - FileStatus logfile = fs.listStatus(WALDIR)[0]; - useDifferentDFSClient(); - - final AtomicInteger count = new AtomicInteger(); - - CancelableProgressable localReporter - = new CancelableProgressable() { - @Override - public boolean progress() { - count.getAndIncrement(); - return false; - } - }; - - FileSystem spiedFs = Mockito.spy(fs); - Mockito.doAnswer(new Answer() { - public FSDataInputStream answer(InvocationOnMock invocation) throws Throwable { - Thread.sleep(1500); // Sleep a while and wait report status invoked - return (FSDataInputStream)invocation.callRealMethod(); - } - }).when(spiedFs).open(Mockito.any(), Mockito.anyInt()); - - try { - conf.setInt("hbase.splitlog.report.period", 1000); - boolean ret = WALSplitter.splitLogFile( - HBASEDIR, logfile, spiedFs, conf, localReporter, null, null, this.mode, wals); - assertFalse("Log splitting should failed", ret); - assertTrue(count.get() > 0); - } catch (IOException e) { - fail("There shouldn't be any exception but: " + e.toString()); - } finally { - // reset it back to its default value - conf.setInt("hbase.splitlog.report.period", 59000); - } - } - - /** - * Test log split process with fake data and lots of edits to trigger threading - * issues. - */ - @Test (timeout=300000) - public void testThreading() throws Exception { - doTestThreading(20000, 128*1024*1024, 0); - } - - /** - * Test blocking behavior of the log split process if writers are writing slower - * than the reader is reading. - */ - @Test (timeout=300000) - public void testThreadingSlowWriterSmallBuffer() throws Exception { - doTestThreading(200, 1024, 50); - } - - /** - * Sets up a log splitter with a mock reader and writer. The mock reader generates - * a specified number of edits spread across 5 regions. The mock writer optionally - * sleeps for each edit it is fed. - * * - * After the split is complete, verifies that the statistics show the correct number - * of edits output into each region. - * - * @param numFakeEdits number of fake edits to push through pipeline - * @param bufferSize size of in-memory buffer - * @param writerSlowness writer threads will sleep this many ms per edit - */ - private void doTestThreading(final int numFakeEdits, - final int bufferSize, - final int writerSlowness) throws Exception { - - Configuration localConf = new Configuration(conf); - localConf.setInt("hbase.regionserver.hlog.splitlog.buffersize", bufferSize); - - // Create a fake log file (we'll override the reader to produce a stream of edits) - Path logPath = new Path(WALDIR, WAL_FILE_PREFIX + ".fake"); - FSDataOutputStream out = fs.create(logPath); - out.close(); - - // Make region dirs for our destination regions so the output doesn't get skipped - final List regions = ImmutableList.of("r0", "r1", "r2", "r3", "r4"); - makeRegionDirs(regions); - - // Create a splitter that reads and writes the data without touching disk - WALSplitter logSplitter = new WALSplitter(wals, - localConf, HBASEDIR, fs, null, null, this.mode) { - - /* Produce a mock writer that doesn't write anywhere */ - @Override - protected Writer createWriter(Path logfile) throws IOException { - Writer mockWriter = Mockito.mock(Writer.class); - Mockito.doAnswer(new Answer() { - int expectedIndex = 0; - - @Override - public Void answer(InvocationOnMock invocation) { - if (writerSlowness > 0) { - try { - Thread.sleep(writerSlowness); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - } - } - Entry entry = (Entry) invocation.getArguments()[0]; - WALEdit edit = entry.getEdit(); - List cells = edit.getCells(); - assertEquals(1, cells.size()); - Cell cell = cells.get(0); - - // Check that the edits come in the right order. - assertEquals(expectedIndex, Bytes.toInt(cell.getRowArray(), cell.getRowOffset(), - cell.getRowLength())); - expectedIndex++; - return null; - } - }).when(mockWriter).append(Mockito.any()); - return mockWriter; - } - - /* Produce a mock reader that generates fake entries */ - @Override - protected Reader getReader(Path curLogFile, CancelableProgressable reporter) - throws IOException { - Reader mockReader = Mockito.mock(Reader.class); - Mockito.doAnswer(new Answer() { - int index = 0; - - @Override - public Entry answer(InvocationOnMock invocation) throws Throwable { - if (index >= numFakeEdits) return null; - - // Generate r0 through r4 in round robin fashion - int regionIdx = index % regions.size(); - byte region[] = new byte[] {(byte)'r', (byte) (0x30 + regionIdx)}; - - Entry ret = createTestEntry(TABLE_NAME, region, - Bytes.toBytes((int)(index / regions.size())), - FAMILY, QUALIFIER, VALUE, index); - index++; - return ret; - } - }).when(mockReader).next(); - return mockReader; - } - }; - - logSplitter.splitLogFile(fs.getFileStatus(logPath), null); - - // Verify number of written edits per region - Map outputCounts = logSplitter.outputSink.getOutputCounts(); - for (Map.Entry entry : outputCounts.entrySet()) { - LOG.info("Got " + entry.getValue() + " output edits for region " + - Bytes.toString(entry.getKey())); - assertEquals((long)entry.getValue(), numFakeEdits / regions.size()); - } - assertEquals("Should have as many outputs as regions", regions.size(), outputCounts.size()); - } - - // Does leaving the writer open in testSplitDeletedRegion matter enough for two tests? - @Test (timeout=300000) - public void testSplitLogFileDeletedRegionDir() throws IOException { - LOG.info("testSplitLogFileDeletedRegionDir"); - final String REGION = "region__1"; - REGIONS.clear(); - REGIONS.add(REGION); - - generateWALs(1, 10, -1); - useDifferentDFSClient(); - - Path regiondir = new Path(TABLEDIR, REGION); - LOG.info("Region directory is" + regiondir); - fs.delete(regiondir, true); - WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals); - assertFalse(fs.exists(regiondir)); - } - - @Test (timeout=300000) - public void testSplitLogFileEmpty() throws IOException { - LOG.info("testSplitLogFileEmpty"); - // we won't create the hlog dir until getWAL got called, so - // make dir here when testing empty log file - fs.mkdirs(WALDIR); - injectEmptyFile(".empty", true); - useDifferentDFSClient(); - - WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals); - Path tdir = FSUtils.getTableDir(HBASEDIR, TABLE_NAME); - assertFalse(fs.exists(tdir)); - - assertEquals(0, countWAL(fs.listStatus(OLDLOGDIR)[0].getPath())); - } - - @Test (timeout=300000) - public void testSplitLogFileMultipleRegions() throws IOException { - LOG.info("testSplitLogFileMultipleRegions"); - generateWALs(1, 10, -1); - splitAndCount(1, 10); - } - - @Test (timeout=300000) - public void testSplitLogFileFirstLineCorruptionLog() - throws IOException { - conf.setBoolean(HBASE_SKIP_ERRORS, true); - generateWALs(1, 10, -1); - FileStatus logfile = fs.listStatus(WALDIR)[0]; - - corruptWAL(logfile.getPath(), - Corruptions.INSERT_GARBAGE_ON_FIRST_LINE, true); - - useDifferentDFSClient(); - WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals); - - final Path corruptDir = new Path(FSUtils.getRootDir(conf), conf.get( - "hbase.regionserver.hlog.splitlog.corrupt.dir", HConstants.CORRUPT_DIR_NAME)); - assertEquals(1, fs.listStatus(corruptDir).length); - } - - /** - * @throws IOException - * @see https://issues.apache.org/jira/browse/HBASE-4862 - */ - @Test (timeout=300000) - public void testConcurrentSplitLogAndReplayRecoverEdit() throws IOException { - LOG.info("testConcurrentSplitLogAndReplayRecoverEdit"); - // Generate wals for our destination region - String regionName = "r0"; - final Path regiondir = new Path(TABLEDIR, regionName); - REGIONS.clear(); - REGIONS.add(regionName); - generateWALs(-1); - - wals.getWAL(Bytes.toBytes(regionName), null); - FileStatus[] logfiles = fs.listStatus(WALDIR); - assertTrue("There should be some log file", - logfiles != null && logfiles.length > 0); - - WALSplitter logSplitter = new WALSplitter(wals, - conf, HBASEDIR, fs, null, null, this.mode) { - @Override - protected Writer createWriter(Path logfile) - throws IOException { - Writer writer = wals.createRecoveredEditsWriter(this.fs, logfile); - // After creating writer, simulate region's - // replayRecoveredEditsIfAny() which gets SplitEditFiles of this - // region and delete them, excluding files with '.temp' suffix. - NavigableSet files = WALSplitter.getSplitEditFilesSorted(fs, regiondir); - if (files != null && !files.isEmpty()) { - for (Path file : files) { - if (!this.fs.delete(file, false)) { - LOG.error("Failed delete of " + file); - } else { - LOG.debug("Deleted recovered.edits file=" + file); - } - } - } - return writer; - } - }; - try{ - logSplitter.splitLogFile(logfiles[0], null); - } catch (IOException e) { - LOG.info(e); - fail("Throws IOException when spliting " - + "log, it is most likely because writing file does not " - + "exist which is caused by concurrent replayRecoveredEditsIfAny()"); - } - if (fs.exists(CORRUPTDIR)) { - if (fs.listStatus(CORRUPTDIR).length > 0) { - fail("There are some corrupt logs, " - + "it is most likely caused by concurrent replayRecoveredEditsIfAny()"); - } - } - } - - private Writer generateWALs(int leaveOpen) throws IOException { - return generateWALs(NUM_WRITERS, ENTRIES, leaveOpen); - } - - private void makeRegionDirs(List regions) throws IOException { - for (String region : regions) { - LOG.debug("Creating dir for region " + region); - fs.mkdirs(new Path(TABLEDIR, region)); - } - } - - /** - * @param leaveOpen index to leave un-closed. -1 to close all. - * @return the writer that's still open, or null if all were closed. - */ - private Writer generateWALs(int writers, int entries, int leaveOpen) throws IOException { - makeRegionDirs(REGIONS); - fs.mkdirs(WALDIR); - Writer [] ws = new Writer[writers]; - int seq = 0; - for (int i = 0; i < writers; i++) { - ws[i] = wals.createWALWriter(fs, new Path(WALDIR, WAL_FILE_PREFIX + i)); - for (int j = 0; j < entries; j++) { - int prefix = 0; - for (String region : REGIONS) { - String row_key = region + prefix++ + i + j; - appendEntry(ws[i], TABLE_NAME, region.getBytes(), row_key.getBytes(), FAMILY, QUALIFIER, - VALUE, seq++); - } - } - if (i != leaveOpen) { - ws[i].close(); - LOG.info("Closing writer " + i); - } - } - if (leaveOpen < 0 || leaveOpen >= writers) { - return null; - } - return ws[leaveOpen]; - } - - private Path[] getLogForRegion(Path rootdir, TableName table, String region) - throws IOException { - Path tdir = FSUtils.getTableDir(rootdir, table); - @SuppressWarnings("deprecation") - Path editsdir = WALSplitter.getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir, - Bytes.toString(region.getBytes()))); - FileStatus[] files = fs.listStatus(editsdir, new PathFilter() { - @Override - public boolean accept(Path p) { - if (WALSplitter.isSequenceIdFile(p)) { - return false; - } - return true; - } - }); - Path[] paths = new Path[files.length]; - for (int i = 0; i < files.length; i++) { - paths[i] = files[i].getPath(); - } - return paths; - } - - private void corruptWAL(Path path, Corruptions corruption, boolean close) throws IOException { - FSDataOutputStream out; - int fileSize = (int) fs.listStatus(path)[0].getLen(); - - FSDataInputStream in = fs.open(path); - byte[] corrupted_bytes = new byte[fileSize]; - in.readFully(0, corrupted_bytes, 0, fileSize); - in.close(); - - switch (corruption) { - case APPEND_GARBAGE: - fs.delete(path, false); - out = fs.create(path); - out.write(corrupted_bytes); - out.write("-----".getBytes()); - closeOrFlush(close, out); - break; - - case INSERT_GARBAGE_ON_FIRST_LINE: - fs.delete(path, false); - out = fs.create(path); - out.write(0); - out.write(corrupted_bytes); - closeOrFlush(close, out); - break; - - case INSERT_GARBAGE_IN_THE_MIDDLE: - fs.delete(path, false); - out = fs.create(path); - int middle = (int) Math.floor(corrupted_bytes.length / 2); - out.write(corrupted_bytes, 0, middle); - out.write(0); - out.write(corrupted_bytes, middle, corrupted_bytes.length - middle); - closeOrFlush(close, out); - break; - - case TRUNCATE: - fs.delete(path, false); - out = fs.create(path); - out.write(corrupted_bytes, 0, fileSize - - (32 + ProtobufLogReader.PB_WAL_COMPLETE_MAGIC.length + Bytes.SIZEOF_INT)); - closeOrFlush(close, out); - break; - - case TRUNCATE_TRAILER: - fs.delete(path, false); - out = fs.create(path); - out.write(corrupted_bytes, 0, fileSize - Bytes.SIZEOF_INT);// trailer is truncated. - closeOrFlush(close, out); - break; - } - } - - private void closeOrFlush(boolean close, FSDataOutputStream out) - throws IOException { - if (close) { - out.close(); - } else { - Method syncMethod = null; - try { - syncMethod = out.getClass().getMethod("hflush", new Class []{}); - } catch (NoSuchMethodException e) { - try { - syncMethod = out.getClass().getMethod("sync", new Class []{}); - } catch (NoSuchMethodException ex) { - throw new IOException("This version of Hadoop supports " + - "neither Syncable.sync() nor Syncable.hflush()."); - } - } - try { - syncMethod.invoke(out, new Object[]{}); - } catch (Exception e) { - throw new IOException(e); - } - // Not in 0out.hflush(); - } - } - - private int countWAL(Path log) throws IOException { - int count = 0; - Reader in = wals.createReader(fs, log); - while (in.next() != null) { - count++; - } - in.close(); - return count; - } - - public static long appendEntry(Writer writer, TableName table, byte[] region, - byte[] row, byte[] family, byte[] qualifier, - byte[] value, long seq) - throws IOException { - LOG.info(Thread.currentThread().getName() + " append"); - writer.append(createTestEntry(table, region, row, family, qualifier, value, seq)); - LOG.info(Thread.currentThread().getName() + " sync"); - writer.sync(); - return seq; - } - - private static Entry createTestEntry( - TableName table, byte[] region, - byte[] row, byte[] family, byte[] qualifier, - byte[] value, long seq) { - long time = System.nanoTime(); - WALEdit edit = new WALEdit(); - seq++; - edit.add(new KeyValue(row, family, qualifier, time, KeyValue.Type.Put, value)); - return new Entry(new WALKey(region, table, seq, time, - HConstants.DEFAULT_CLUSTER_ID), edit); - } - - private void injectEmptyFile(String suffix, boolean closeFile) - throws IOException { - Writer writer = wals.createWALWriter(fs, new Path(WALDIR, WAL_FILE_PREFIX + suffix), - conf); - if (closeFile) writer.close(); - } - - private boolean logsAreEqual(Path p1, Path p2) throws IOException { - Reader in1, in2; - in1 = wals.createReader(fs, p1); - in2 = wals.createReader(fs, p2); - Entry entry1; - Entry entry2; - while ((entry1 = in1.next()) != null) { - entry2 = in2.next(); - if ((entry1.getKey().compareTo(entry2.getKey()) != 0) || - (!entry1.getEdit().toString().equals(entry2.getEdit().toString()))) { - return false; - } - } - in1.close(); - in2.close(); - return true; - } -} \ No newline at end of file