diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DefaultWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DefaultWALProvider.java index ca463fc3036..f15a38772f7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DefaultWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DefaultWALProvider.java @@ -20,9 +20,11 @@ package org.apache.hadoop.hbase.wal; import java.io.IOException; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.regex.Pattern; import com.google.common.annotations.VisibleForTesting; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -73,6 +75,18 @@ public class DefaultWALProvider implements WALProvider { } protected FSHLog log = null; + private WALFactory factory = null; + private Configuration conf = null; + private List listeners = null; + private String providerId = null; + private AtomicBoolean initialized = new AtomicBoolean(false); + // for default wal provider, logPrefix won't change + private String logPrefix = null; + + /** + * we synchronized on walCreateLock to prevent wal recreation in different threads + */ + private final Object walCreateLock = new Object(); /** * @param factory factory that made us, identity used for FS layout. may not be null @@ -84,31 +98,47 @@ public class DefaultWALProvider implements WALProvider { @Override public void init(final WALFactory factory, final Configuration conf, final List listeners, String providerId) throws IOException { - if (null != log) { + if (!initialized.compareAndSet(false, true)) { throw new IllegalStateException("WALProvider.init should only be called once."); } - if (null == providerId) { - providerId = DEFAULT_PROVIDER_ID; + this.factory = factory; + this.conf = conf; + this.listeners = listeners; + this.providerId = providerId; + // get log prefix + StringBuilder sb = new StringBuilder().append(factory.factoryId); + if (providerId != null) { + if (providerId.startsWith(WAL_FILE_NAME_DELIMITER)) { + sb.append(providerId); + } else { + sb.append(WAL_FILE_NAME_DELIMITER).append(providerId); + } } - final String logPrefix = factory.factoryId + WAL_FILE_NAME_DELIMITER + providerId; - log = new FSHLog(FileSystem.get(conf), FSUtils.getRootDir(conf), - getWALDirectoryName(factory.factoryId), HConstants.HREGION_OLDLOGDIR_NAME, conf, listeners, - true, logPrefix, META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null); + logPrefix = sb.toString(); } @Override public WAL getWAL(final byte[] identifier) throws IOException { - return log; + // must lock since getWAL will create hlog on fs which is time consuming + synchronized (walCreateLock) { + if (log == null) { + log = new FSHLog(FileSystem.get(conf), FSUtils.getRootDir(conf), + getWALDirectoryName(factory.factoryId), HConstants.HREGION_OLDLOGDIR_NAME, conf, + listeners, true, logPrefix, + META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null); + } + } + return log; } @Override public void close() throws IOException { - log.close(); + if (log != null) log.close(); } @Override public void shutdown() throws IOException { - log.shutdown(); + if (log != null) log.shutdown(); } // should be package private; more visible for use in FSHLog @@ -129,7 +159,7 @@ public class DefaultWALProvider implements WALProvider { */ @Override public long getNumLogFiles() { - return this.log.getNumLogFiles(); + return log == null ? 0 : this.log.getNumLogFiles(); } /** @@ -139,7 +169,7 @@ public class DefaultWALProvider implements WALProvider { */ @Override public long getLogFileSize() { - return this.log.getLogFileSize(); + return log == null ? 0 : this.log.getLogFileSize(); } /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java index a999624acbc..95ca1722a74 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java @@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeepDeletedCells; import org.apache.hadoop.hbase.KeyValue; @@ -650,7 +651,9 @@ public class TestImportExport { // Register the wal listener for the import table TableWALActionListener walListener = new TableWALActionListener(importTableName); - WAL wal = UTIL.getMiniHBaseCluster().getRegionServer(0).getWAL(null); + HRegionInfo region = UTIL.getHBaseCluster().getRegionServerThreads().get(0).getRegionServer() + .getOnlineRegions(importTable.getName()).get(0).getRegionInfo(); + WAL wal = UTIL.getMiniHBaseCluster().getRegionServer(0).getWAL(region); wal.registerWALActionsListener(walListener); // Run the import with SKIP_WAL @@ -666,7 +669,9 @@ public class TestImportExport { // Run the import with the default durability option importTableName = "importTestDurability2"; importTable = UTIL.createTable(TableName.valueOf(importTableName), FAMILYA, 3); - wal.unregisterWALActionsListener(walListener); + region = UTIL.getHBaseCluster().getRegionServerThreads().get(0).getRegionServer() + .getOnlineRegions(importTable.getName()).get(0).getRegionInfo(); + wal = UTIL.getMiniHBaseCluster().getRegionServer(0).getWAL(region); walListener = new TableWALActionListener(importTableName); wal.registerWALActionsListener(walListener); args = new String[] { importTableName, FQ_OUTPUT_DIR }; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java index 9ecd4082333..6309e8921af 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java @@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.MiniHBaseCluster; import org.apache.hadoop.hbase.ServerName; @@ -72,8 +73,10 @@ import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; +import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; /** * Test log deletion as logs are rolled. @@ -89,6 +92,7 @@ public class TestLogRolling { private Admin admin; private MiniHBaseCluster cluster; private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + @Rule public final TestName name = new TestName(); public TestLogRolling() { this.server = null; @@ -212,9 +216,11 @@ public class TestLogRolling { @Test public void testLogRolling() throws Exception { this.tableName = getName(); - // TODO: Why does this write data take for ever? - startAndWriteData(); - final WAL log = server.getWAL(null); + // TODO: Why does this write data take for ever? + startAndWriteData(); + HRegionInfo region = + server.getOnlineRegions(TableName.valueOf(tableName)).get(0).getRegionInfo(); + final WAL log = server.getWAL(region); LOG.info("after writing there are " + DefaultWALProvider.getNumRolledLogFiles(log) + " log files"); @@ -231,8 +237,8 @@ public class TestLogRolling { assertTrue(("actual count: " + count), count <= 2); } - private static String getName() { - return "TestLogRolling"; + private String getName() { + return "TestLogRolling-" + name.getMethodName(); } void writeData(Table table, int rownum) throws IOException { @@ -308,7 +314,8 @@ public class TestLogRolling { assertTrue(((HTable) table).isAutoFlush()); server = TEST_UTIL.getRSForFirstRegionInTable(desc.getTableName()); - final FSHLog log = (FSHLog) server.getWAL(null); + HRegionInfo region = server.getOnlineRegions(desc.getTableName()).get(0).getRegionInfo(); + final FSHLog log = (FSHLog) server.getWAL(region); final AtomicBoolean lowReplicationHookCalled = new AtomicBoolean(false); log.registerWALActionsListener(new WALActionsListener.Base() { @@ -425,7 +432,8 @@ public class TestLogRolling { Table table = TEST_UTIL.getConnection().getTable(desc.getTableName()); server = TEST_UTIL.getRSForFirstRegionInTable(desc.getTableName()); - final WAL log = server.getWAL(null); + HRegionInfo region = server.getOnlineRegions(desc.getTableName()).get(0).getRegionInfo(); + final WAL log = server.getWAL(region); final List paths = new ArrayList(); final List preLogRolledCalled = new ArrayList(); @@ -566,37 +574,35 @@ public class TestLogRolling { @Test public void testCompactionRecordDoesntBlockRolling() throws Exception { Table table = null; - Table table2 = null; // When the hbase:meta table can be opened, the region servers are running Table t = TEST_UTIL.getConnection().getTable(TableName.META_TABLE_NAME); try { table = createTestTable(getName()); - table2 = createTestTable(getName() + "1"); server = TEST_UTIL.getRSForFirstRegionInTable(table.getName()); - final WAL log = server.getWAL(null); - Region region = server.getOnlineRegions(table2.getName()).get(0); + Region region = server.getOnlineRegions(table.getName()).get(0); + final WAL log = server.getWAL(region.getRegionInfo()); Store s = region.getStore(HConstants.CATALOG_FAMILY); //have to flush namespace to ensure it doesn't affect wall tests admin.flush(TableName.NAMESPACE_TABLE_NAME); - // Put some stuff into table2, to make sure we have some files to compact. + // Put some stuff into table, to make sure we have some files to compact. for (int i = 1; i <= 2; ++i) { - doPut(table2, i); - admin.flush(table2.getName()); + doPut(table, i); + admin.flush(table.getName()); } - doPut(table2, 3); // don't flush yet, or compaction might trigger before we roll WAL + doPut(table, 3); // don't flush yet, or compaction might trigger before we roll WAL assertEquals("Should have no WAL after initial writes", 0, DefaultWALProvider.getNumRolledLogFiles(log)); assertEquals(2, s.getStorefilesCount()); - // Roll the log and compact table2, to have compaction record in the 2nd WAL. + // Roll the log and compact table, to have compaction record in the 2nd WAL. log.rollWriter(); assertEquals("Should have WAL; one table is not flushed", 1, DefaultWALProvider.getNumRolledLogFiles(log)); - admin.flush(table2.getName()); + admin.flush(table.getName()); region.compact(false); // Wait for compaction in case if flush triggered it before us. Assert.assertNotNull(s); @@ -606,7 +612,7 @@ public class TestLogRolling { assertEquals("Compaction didn't happen", 1, s.getStorefilesCount()); // Write some value to the table so the WAL cannot be deleted until table is flushed. - doPut(table, 0); // Now 2nd WAL will have compaction record for table2 and put for table. + doPut(table, 0); // Now 2nd WAL will have both compaction and put record for table. log.rollWriter(); // 1st WAL deleted, 2nd not deleted yet. assertEquals("Should have WAL; one table is not flushed", 1, DefaultWALProvider.getNumRolledLogFiles(log)); @@ -620,7 +626,6 @@ public class TestLogRolling { } finally { if (t != null) t.close(); if (table != null) table.close(); - if (table2 != null) table2.close(); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java index 0fd44f5dc3d..a32aede16c0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java @@ -464,6 +464,9 @@ public class TestWALSplit { } private void testEmptyLogFiles(final boolean close) throws IOException { + // we won't create the hlog dir until getWAL got called, so + // make dir here when testing empty log file + fs.mkdirs(WALDIR); injectEmptyFile(".empty", close); generateWALs(Integer.MAX_VALUE); injectEmptyFile("empty", close); @@ -592,7 +595,7 @@ public class TestWALSplit { LOG.debug("split with 'skip errors' set to 'false' correctly threw"); } assertEquals("if skip.errors is false all files should remain in place", - NUM_WRITERS + 1 /* Factory WAL */, fs.listStatus(WALDIR).length); + NUM_WRITERS, fs.listStatus(WALDIR).length); } private void ignoreCorruption(final Corruptions corruption, final int entryCount, @@ -646,8 +649,7 @@ public class TestWALSplit { useDifferentDFSClient(); WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals); FileStatus[] archivedLogs = fs.listStatus(OLDLOGDIR); - assertEquals("wrong number of files in the archive log", NUM_WRITERS + 1 /* wal from factory */, - archivedLogs.length); + assertEquals("wrong number of files in the archive log", NUM_WRITERS, archivedLogs.length); } @Test (timeout=300000) @@ -790,7 +792,7 @@ public class TestWALSplit { try { WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, spiedFs, conf, wals); - assertEquals(NUM_WRITERS + 1 /* wal created by factory */, fs.listStatus(OLDLOGDIR).length); + assertEquals(NUM_WRITERS, fs.listStatus(OLDLOGDIR).length); assertFalse(fs.exists(WALDIR)); } catch (IOException e) { fail("There shouldn't be any exception but: " + e.toString()); @@ -1019,6 +1021,9 @@ public class TestWALSplit { @Test (timeout=300000) public void testSplitLogFileEmpty() throws IOException { LOG.info("testSplitLogFileEmpty"); + // we won't create the hlog dir until getWAL got called, so + // make dir here when testing empty log file + fs.mkdirs(WALDIR); injectEmptyFile(".empty", true); useDifferentDFSClient();