HBASE-14411 Fix unit test failures when using multiwal as default WAL provider (Yu Li)

This commit is contained in:
tedyu 2015-09-16 07:09:20 -07:00
parent 1517deee67
commit 0452ba09b5
4 changed files with 81 additions and 37 deletions

View File

@ -23,6 +23,7 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern;
import com.google.common.annotations.VisibleForTesting;
@ -76,6 +77,18 @@ public class DefaultWALProvider implements WALProvider {
}
protected FSHLog log = null;
private WALFactory factory = null;
private Configuration conf = null;
private List<WALActionsListener> listeners = null;
private String providerId = null;
private AtomicBoolean initialized = new AtomicBoolean(false);
// for default wal provider, logPrefix won't change
private String logPrefix = null;
/**
* we synchronized on walCreateLock to prevent wal recreation in different threads
*/
private final Object walCreateLock = new Object();
/**
* @param factory factory that made us, identity used for FS layout. may not be null
@ -87,31 +100,47 @@ public class DefaultWALProvider implements WALProvider {
@Override
public void init(final WALFactory factory, final Configuration conf,
final List<WALActionsListener> listeners, String providerId) throws IOException {
if (null != log) {
if (!initialized.compareAndSet(false, true)) {
throw new IllegalStateException("WALProvider.init should only be called once.");
}
if (null == providerId) {
providerId = DEFAULT_PROVIDER_ID;
this.factory = factory;
this.conf = conf;
this.listeners = listeners;
this.providerId = providerId;
// get log prefix
StringBuilder sb = new StringBuilder().append(factory.factoryId);
if (providerId != null) {
if (providerId.startsWith(WAL_FILE_NAME_DELIMITER)) {
sb.append(providerId);
} else {
sb.append(WAL_FILE_NAME_DELIMITER).append(providerId);
}
final String logPrefix = factory.factoryId + WAL_FILE_NAME_DELIMITER + providerId;
log = new FSHLog(FileSystem.get(conf), FSUtils.getRootDir(conf),
getWALDirectoryName(factory.factoryId), HConstants.HREGION_OLDLOGDIR_NAME, conf, listeners,
true, logPrefix, META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null);
}
logPrefix = sb.toString();
}
@Override
public WAL getWAL(final byte[] identifier) throws IOException {
// must lock since getWAL will create hlog on fs which is time consuming
synchronized (walCreateLock) {
if (log == null) {
log = new FSHLog(FileSystem.get(conf), FSUtils.getRootDir(conf),
getWALDirectoryName(factory.factoryId), HConstants.HREGION_OLDLOGDIR_NAME, conf,
listeners, true, logPrefix,
META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null);
}
}
return log;
}
@Override
public void close() throws IOException {
log.close();
if (log != null) log.close();
}
@Override
public void shutdown() throws IOException {
log.shutdown();
if (log != null) log.shutdown();
}
// should be package private; more visible for use in FSHLog
@ -132,7 +161,7 @@ public class DefaultWALProvider implements WALProvider {
*/
@Override
public long getNumLogFiles() {
return this.log.getNumLogFiles();
return log == null ? 0 : this.log.getNumLogFiles();
}
/**
@ -142,7 +171,7 @@ public class DefaultWALProvider implements WALProvider {
*/
@Override
public long getLogFileSize() {
return this.log.getLogFileSize();
return log == null ? 0 : this.log.getLogFileSize();
}
/**

View File

@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.testclassification.MediumTests;
@ -643,7 +644,9 @@ public class TestImportExport {
// Register the wal listener for the import table
TableWALActionListener walListener = new TableWALActionListener(importTableName);
WAL wal = UTIL.getMiniHBaseCluster().getRegionServer(0).getWAL(null);
HRegionInfo region = UTIL.getHBaseCluster().getRegionServerThreads().get(0).getRegionServer()
.getOnlineRegions(importTable.getName()).get(0).getRegionInfo();
WAL wal = UTIL.getMiniHBaseCluster().getRegionServer(0).getWAL(region);
wal.registerWALActionsListener(walListener);
// Run the import with SKIP_WAL
@ -659,7 +662,9 @@ public class TestImportExport {
// Run the import with the default durability option
importTableName = "importTestDurability2";
importTable = UTIL.createTable(TableName.valueOf(importTableName), FAMILYA, 3);
wal.unregisterWALActionsListener(walListener);
region = UTIL.getHBaseCluster().getRegionServerThreads().get(0).getRegionServer()
.getOnlineRegions(importTable.getName()).get(0).getRegionInfo();
wal = UTIL.getMiniHBaseCluster().getRegionServer(0).getWAL(region);
walListener = new TableWALActionListener(importTableName);
wal.registerWALActionsListener(walListener);
args = new String[] { importTableName, FQ_OUTPUT_DIR };

View File

@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.MiniHBaseCluster;
@ -71,8 +72,10 @@ import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
/**
* Test log deletion as logs are rolled.
@ -88,6 +91,7 @@ public class TestLogRolling {
private Admin admin;
private MiniHBaseCluster cluster;
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
@Rule public final TestName name = new TestName();
public TestLogRolling() {
this.server = null;
@ -213,7 +217,9 @@ public class TestLogRolling {
this.tableName = getName();
// TODO: Why does this write data take for ever?
startAndWriteData();
final WAL log = server.getWAL(null);
HRegionInfo region =
server.getOnlineRegions(TableName.valueOf(tableName)).get(0).getRegionInfo();
final WAL log = server.getWAL(region);
LOG.info("after writing there are " + DefaultWALProvider.getNumRolledLogFiles(log) +
" log files");
@ -230,8 +236,8 @@ public class TestLogRolling {
assertTrue(("actual count: " + count), count <= 2);
}
private static String getName() {
return "TestLogRolling";
private String getName() {
return "TestLogRolling-" + name.getMethodName();
}
void writeData(Table table, int rownum) throws IOException {
@ -307,7 +313,8 @@ public class TestLogRolling {
assertTrue(((HTable) table).isAutoFlush());
server = TEST_UTIL.getRSForFirstRegionInTable(desc.getTableName());
final FSHLog log = (FSHLog) server.getWAL(null);
HRegionInfo region = server.getOnlineRegions(desc.getTableName()).get(0).getRegionInfo();
final FSHLog log = (FSHLog) server.getWAL(region);
final AtomicBoolean lowReplicationHookCalled = new AtomicBoolean(false);
log.registerWALActionsListener(new WALActionsListener.Base() {
@ -424,7 +431,8 @@ public class TestLogRolling {
Table table = new HTable(TEST_UTIL.getConfiguration(), desc.getTableName());
server = TEST_UTIL.getRSForFirstRegionInTable(desc.getTableName());
final WAL log = server.getWAL(null);
HRegionInfo region = server.getOnlineRegions(desc.getTableName()).get(0).getRegionInfo();
final WAL log = server.getWAL(region);
final List<Path> paths = new ArrayList<Path>();
final List<Integer> preLogRolledCalled = new ArrayList<Integer>();
@ -564,37 +572,35 @@ public class TestLogRolling {
@Test
public void testCompactionRecordDoesntBlockRolling() throws Exception {
Table table = null;
Table table2 = null;
// When the hbase:meta table can be opened, the region servers are running
Table t = new HTable(TEST_UTIL.getConfiguration(), TableName.META_TABLE_NAME);
try {
table = createTestTable(getName());
table2 = createTestTable(getName() + "1");
server = TEST_UTIL.getRSForFirstRegionInTable(table.getName());
final WAL log = server.getWAL(null);
Region region = server.getOnlineRegions(table2.getName()).get(0);
Region region = server.getOnlineRegions(table.getName()).get(0);
final WAL log = server.getWAL(region.getRegionInfo());
Store s = region.getStore(HConstants.CATALOG_FAMILY);
//have to flush namespace to ensure it doesn't affect wall tests
admin.flush(TableName.NAMESPACE_TABLE_NAME);
// Put some stuff into table2, to make sure we have some files to compact.
// Put some stuff into table, to make sure we have some files to compact.
for (int i = 1; i <= 2; ++i) {
doPut(table2, i);
admin.flush(table2.getName());
doPut(table, i);
admin.flush(table.getName());
}
doPut(table2, 3); // don't flush yet, or compaction might trigger before we roll WAL
doPut(table, 3); // don't flush yet, or compaction might trigger before we roll WAL
assertEquals("Should have no WAL after initial writes", 0,
DefaultWALProvider.getNumRolledLogFiles(log));
assertEquals(2, s.getStorefilesCount());
// Roll the log and compact table2, to have compaction record in the 2nd WAL.
// Roll the log and compact table, to have compaction record in the 2nd WAL.
log.rollWriter();
assertEquals("Should have WAL; one table is not flushed", 1,
DefaultWALProvider.getNumRolledLogFiles(log));
admin.flush(table2.getName());
admin.flush(table.getName());
region.compact(false);
// Wait for compaction in case if flush triggered it before us.
Assert.assertNotNull(s);
@ -604,7 +610,7 @@ public class TestLogRolling {
assertEquals("Compaction didn't happen", 1, s.getStorefilesCount());
// Write some value to the table so the WAL cannot be deleted until table is flushed.
doPut(table, 0); // Now 2nd WAL will have compaction record for table2 and put for table.
doPut(table, 0); // Now 2nd WAL will have both compaction and put record for table.
log.rollWriter(); // 1st WAL deleted, 2nd not deleted yet.
assertEquals("Should have WAL; one table is not flushed", 1,
DefaultWALProvider.getNumRolledLogFiles(log));
@ -618,7 +624,6 @@ public class TestLogRolling {
} finally {
if (t != null) t.close();
if (table != null) table.close();
if (table2 != null) table2.close();
}
}

View File

@ -465,6 +465,9 @@ public class TestWALSplit {
}
private void testEmptyLogFiles(final boolean close) throws IOException {
// we won't create the hlog dir until getWAL got called, so
// make dir here when testing empty log file
fs.mkdirs(WALDIR);
injectEmptyFile(".empty", close);
generateWALs(Integer.MAX_VALUE);
injectEmptyFile("empty", close);
@ -593,7 +596,7 @@ public class TestWALSplit {
LOG.debug("split with 'skip errors' set to 'false' correctly threw");
}
assertEquals("if skip.errors is false all files should remain in place",
NUM_WRITERS + 1 /* Factory WAL */, fs.listStatus(WALDIR).length);
NUM_WRITERS, fs.listStatus(WALDIR).length);
}
private void ignoreCorruption(final Corruptions corruption, final int entryCount,
@ -647,8 +650,7 @@ public class TestWALSplit {
useDifferentDFSClient();
WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
FileStatus[] archivedLogs = fs.listStatus(OLDLOGDIR);
assertEquals("wrong number of files in the archive log", NUM_WRITERS + 1 /* wal from factory */,
archivedLogs.length);
assertEquals("wrong number of files in the archive log", NUM_WRITERS, archivedLogs.length);
}
@Test (timeout=300000)
@ -791,7 +793,7 @@ public class TestWALSplit {
try {
WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, spiedFs, conf, wals);
assertEquals(NUM_WRITERS + 1 /* wal created by factory */, fs.listStatus(OLDLOGDIR).length);
assertEquals(NUM_WRITERS, fs.listStatus(OLDLOGDIR).length);
assertFalse(fs.exists(WALDIR));
} catch (IOException e) {
fail("There shouldn't be any exception but: " + e.toString());
@ -1019,6 +1021,9 @@ public class TestWALSplit {
@Test (timeout=300000)
public void testSplitLogFileEmpty() throws IOException {
LOG.info("testSplitLogFileEmpty");
// we won't create the hlog dir until getWAL got called, so
// make dir here when testing empty log file
fs.mkdirs(WALDIR);
injectEmptyFile(".empty", true);
useDifferentDFSClient();