HBASE-19510 TestDistributedLogSplitting is flakey for AsyncFSWAL

This commit is contained in:
zhangduo 2017-12-14 15:59:41 +08:00
parent cb4bbea0f1
commit a1f8821b85
3 changed files with 208 additions and 187 deletions

View File

@ -51,11 +51,8 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.MiniHBaseCluster; import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.NamespaceDescriptor;
@ -76,21 +73,16 @@ import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread; import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKeyImpl; import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.hadoop.hbase.wal.WALSplitter; import org.apache.hadoop.hbase.wal.WALSplitter;
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.junit.After; import org.junit.After;
@ -99,111 +91,94 @@ import org.junit.Before;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName; import org.junit.rules.TestName;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
@Category({MasterTests.class, LargeTests.class}) /**
@SuppressWarnings("deprecation") * Base class for testing distributed log splitting.
public class TestDistributedLogSplitting { */
public abstract class AbstractTestDLS {
private static final Log LOG = LogFactory.getLog(TestSplitLogManager.class); private static final Log LOG = LogFactory.getLog(TestSplitLogManager.class);
static {
// Uncomment the following line if more verbosity is needed for
// debugging (see HBASE-12285 for details).
//Logger.getLogger("org.apache.hadoop.hbase").setLevel(Level.DEBUG);
// test ThreeRSAbort fails under hadoop2 (2.0.2-alpha) if shortcircuit-read (scr) is on. this private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
// turns it off for this test. TODO: Figure out why scr breaks recovery.
System.setProperty("hbase.tests.use.shortcircuit.reads", "false");
} // Start a cluster with 2 masters and 5 regionservers
private static final int NUM_MASTERS = 2;
private static final int NUM_RS = 5;
private static byte[] COLUMN_FAMILY = Bytes.toBytes("family");
@Rule @Rule
public TestName testName = new TestName(); public TestName testName = new TestName();
TableName tableName;
// Start a cluster with 2 masters and 6 regionservers private TableName tableName;
static final int NUM_MASTERS = 2; private MiniHBaseCluster cluster;
static final int NUM_RS = 5; private HMaster master;
static byte[] COLUMN_FAMILY = Bytes.toBytes("family"); private Configuration conf;
MiniHBaseCluster cluster;
HMaster master;
Configuration conf;
static Configuration originalConf;
static HBaseTestingUtility TEST_UTIL;
static MiniZooKeeperCluster zkCluster;
@Rule @Rule
public TestName name = new TestName(); public TestName name = new TestName();
@BeforeClass @BeforeClass
public static void setup() throws Exception { public static void setup() throws Exception {
TEST_UTIL = new HBaseTestingUtility(HBaseConfiguration.create()); // Uncomment the following line if more verbosity is needed for
zkCluster = TEST_UTIL.startMiniZKCluster(); // debugging (see HBASE-12285 for details).
originalConf = TEST_UTIL.getConfiguration(); // Logger.getLogger("org.apache.hadoop.hbase").setLevel(Level.DEBUG);
TEST_UTIL.startMiniZKCluster();
TEST_UTIL.startMiniDFSCluster(3);
} }
@AfterClass @AfterClass
public static void tearDown() throws IOException { public static void tearDown() throws Exception {
TEST_UTIL.shutdownMiniZKCluster(); TEST_UTIL.shutdownMiniCluster();
TEST_UTIL.shutdownMiniDFSCluster();
TEST_UTIL.shutdownMiniHBaseCluster();
} }
private void startCluster(int num_rs) throws Exception { protected abstract String getWalProvider();
private void startCluster(int numRS) throws Exception {
SplitLogCounters.resetCounters(); SplitLogCounters.resetCounters();
LOG.info("Starting cluster"); LOG.info("Starting cluster");
conf.getLong("hbase.splitlog.max.resubmit", 0); conf.setLong("hbase.splitlog.max.resubmit", 0);
// Make the failure test faster // Make the failure test faster
conf.setInt("zookeeper.recovery.retry", 0); conf.setInt("zookeeper.recovery.retry", 0);
conf.setInt(HConstants.REGIONSERVER_INFO_PORT, -1); conf.setInt(HConstants.REGIONSERVER_INFO_PORT, -1);
conf.setFloat(HConstants.LOAD_BALANCER_SLOP_KEY, (float) 100.0); // no load balancing conf.setFloat(HConstants.LOAD_BALANCER_SLOP_KEY, (float) 100.0); // no load balancing
conf.setInt("hbase.regionserver.wal.max.splitters", 3); conf.setInt("hbase.regionserver.wal.max.splitters", 3);
conf.setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10); conf.setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10);
TEST_UTIL.shutdownMiniHBaseCluster(); conf.set("hbase.wal.provider", getWalProvider());
TEST_UTIL = new HBaseTestingUtility(conf); TEST_UTIL.startMiniHBaseCluster(NUM_MASTERS, numRS);
TEST_UTIL.setZkCluster(zkCluster);
TEST_UTIL.startMiniHBaseCluster(NUM_MASTERS, num_rs);
cluster = TEST_UTIL.getHBaseCluster(); cluster = TEST_UTIL.getHBaseCluster();
LOG.info("Waiting for active/ready master"); LOG.info("Waiting for active/ready master");
cluster.waitForActiveAndReadyMaster(); cluster.waitForActiveAndReadyMaster();
master = cluster.getMaster(); master = cluster.getMaster();
while (cluster.getLiveRegionServerThreads().size() < num_rs) { TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() {
Threads.sleep(10); @Override
} public boolean evaluate() throws Exception {
return cluster.getLiveRegionServerThreads().size() >= numRS;
}
});
} }
@Before @Before
public void before() throws Exception { public void before() throws Exception {
// refresh configuration conf = TEST_UTIL.getConfiguration();
conf = HBaseConfiguration.create(originalConf);
tableName = TableName.valueOf(testName.getMethodName()); tableName = TableName.valueOf(testName.getMethodName());
} }
@After @After
public void after() throws Exception { public void after() throws Exception {
try { TEST_UTIL.shutdownMiniHBaseCluster();
if (TEST_UTIL.getHBaseCluster() != null) { TEST_UTIL.getTestFileSystem().delete(FSUtils.getRootDir(TEST_UTIL.getConfiguration()), true);
for (MasterThread mt : TEST_UTIL.getHBaseCluster().getLiveMasterThreads()) { ZKUtil.deleteNodeRecursively(TEST_UTIL.getZooKeeperWatcher(), "/hbase");
mt.getMaster().abort("closing...", null);
}
}
TEST_UTIL.shutdownMiniHBaseCluster();
} finally {
TEST_UTIL.getTestFileSystem().delete(FSUtils.getRootDir(TEST_UTIL.getConfiguration()), true);
ZKUtil.deleteNodeRecursively(TEST_UTIL.getZooKeeperWatcher(), "/hbase");
}
} }
@Test (timeout=300000) @Test(timeout = 300000)
public void testRecoveredEdits() throws Exception { public void testRecoveredEdits() throws Exception {
conf.setLong("hbase.regionserver.hlog.blocksize", 30 * 1024); // create more than one wal conf.setLong("hbase.regionserver.hlog.blocksize", 30 * 1024); // create more than one wal
startCluster(NUM_RS); startCluster(NUM_RS);
final int NUM_LOG_LINES = 10000; int numLogLines = 10000;
final SplitLogManager slm = master.getMasterWalManager().getSplitLogManager(); SplitLogManager slm = master.getMasterWalManager().getSplitLogManager();
// turn off load balancing to prevent regions from moving around otherwise // turn off load balancing to prevent regions from moving around otherwise
// they will consume recovered.edits // they will consume recovered.edits
master.balanceSwitch(false); master.balanceSwitch(false);
@ -214,8 +189,8 @@ public class TestDistributedLogSplitting {
Path rootdir = FSUtils.getRootDir(conf); Path rootdir = FSUtils.getRootDir(conf);
int numRegions = 50; int numRegions = 50;
Table t = installTable(new ZKWatcher(conf, "table-creation", null), numRegions); try (ZKWatcher zkw = new ZKWatcher(conf, "table-creation", null);
try { Table t = installTable(zkw, numRegions)) {
TableName table = t.getName(); TableName table = t.getName();
List<RegionInfo> regions = null; List<RegionInfo> regions = null;
HRegionServer hrs = null; HRegionServer hrs = null;
@ -223,10 +198,12 @@ public class TestDistributedLogSplitting {
hrs = rsts.get(i).getRegionServer(); hrs = rsts.get(i).getRegionServer();
regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices()); regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
// At least one RS will have >= to average number of regions. // At least one RS will have >= to average number of regions.
if (regions.size() >= numRegions/NUM_RS) break; if (regions.size() >= numRegions / NUM_RS) {
break;
}
} }
final Path logDir = new Path(rootdir, AbstractFSWALProvider.getWALDirectoryName(hrs Path logDir = new Path(rootdir,
.getServerName().toString())); AbstractFSWALProvider.getWALDirectoryName(hrs.getServerName().toString()));
LOG.info("#regions = " + regions.size()); LOG.info("#regions = " + regions.size());
Iterator<RegionInfo> it = regions.iterator(); Iterator<RegionInfo> it = regions.iterator();
@ -238,15 +215,16 @@ public class TestDistributedLogSplitting {
} }
} }
makeWAL(hrs, regions, NUM_LOG_LINES, 100); makeWAL(hrs, regions, numLogLines, 100);
slm.splitLogDistributed(logDir); slm.splitLogDistributed(logDir);
int count = 0; int count = 0;
for (RegionInfo hri : regions) { for (RegionInfo hri : regions) {
Path tdir = FSUtils.getTableDir(rootdir, table); Path tdir = FSUtils.getTableDir(rootdir, table);
Path editsdir = WALSplitter.getRegionDirRecoveredEditsDir( @SuppressWarnings("deprecation")
HRegion.getRegionDir(tdir, hri.getEncodedName())); Path editsdir = WALSplitter
.getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir, hri.getEncodedName()));
LOG.debug("checking edits dir " + editsdir); LOG.debug("checking edits dir " + editsdir);
FileStatus[] files = fs.listStatus(editsdir, new PathFilter() { FileStatus[] files = fs.listStatus(editsdir, new PathFilter() {
@Override @Override
@ -258,8 +236,8 @@ public class TestDistributedLogSplitting {
} }
}); });
assertTrue( assertTrue(
"edits dir should have more than a single file in it. instead has " + files.length, "edits dir should have more than a single file in it. instead has " + files.length,
files.length > 1); files.length > 1);
for (int i = 0; i < files.length; i++) { for (int i = 0; i < files.length; i++) {
int c = countWAL(files[i].getPath(), fs, conf); int c = countWAL(files[i].getPath(), fs, conf);
count += c; count += c;
@ -269,9 +247,7 @@ public class TestDistributedLogSplitting {
// check that the log file is moved // check that the log file is moved
assertFalse(fs.exists(logDir)); assertFalse(fs.exists(logDir));
assertEquals(NUM_LOG_LINES, count); assertEquals(numLogLines, count);
} finally {
if (t != null) t.close();
} }
} }
@ -280,18 +256,17 @@ public class TestDistributedLogSplitting {
conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, NUM_RS - 1); conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, NUM_RS - 1);
startCluster(NUM_RS); startCluster(NUM_RS);
final int NUM_REGIONS_TO_CREATE = 40; int numRegionsToCreate = 40;
final int NUM_LOG_LINES = 1000; int numLogLines = 1000;
// turn off load balancing to prevent regions from moving around otherwise // turn off load balancing to prevent regions from moving around otherwise
// they will consume recovered.edits // they will consume recovered.edits
master.balanceSwitch(false); master.balanceSwitch(false);
final ZKWatcher zkw = new ZKWatcher(conf, "table-creation", null); try (ZKWatcher zkw = new ZKWatcher(conf, "table-creation", null);
Table ht = installTable(zkw, NUM_REGIONS_TO_CREATE); Table ht = installTable(zkw, numRegionsToCreate);) {
try {
HRegionServer hrs = findRSToKill(false); HRegionServer hrs = findRSToKill(false);
List<RegionInfo> regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices()); List<RegionInfo> regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
makeWAL(hrs, regions, NUM_LOG_LINES, 100); makeWAL(hrs, regions, numLogLines, 100);
// abort master // abort master
abortMaster(cluster); abortMaster(cluster);
@ -309,71 +284,68 @@ public class TestDistributedLogSplitting {
}); });
Thread.sleep(2000); Thread.sleep(2000);
LOG.info("Current Open Regions:" LOG.info("Current Open Regions:" + HBaseTestingUtility.getAllOnlineRegions(cluster).size());
+ HBaseTestingUtility.getAllOnlineRegions(cluster).size());
// wait for abort completes // wait for abort completes
TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() { TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() {
@Override @Override
public boolean evaluate() throws Exception { public boolean evaluate() throws Exception {
return (HBaseTestingUtility.getAllOnlineRegions(cluster).size() return (HBaseTestingUtility.getAllOnlineRegions(cluster)
>= (NUM_REGIONS_TO_CREATE + 1)); .size() >= (numRegionsToCreate + 1));
} }
}); });
LOG.info("Current Open Regions After Master Node Starts Up:" LOG.info("Current Open Regions After Master Node Starts Up:" +
+ HBaseTestingUtility.getAllOnlineRegions(cluster).size()); HBaseTestingUtility.getAllOnlineRegions(cluster).size());
assertEquals(NUM_LOG_LINES, TEST_UTIL.countRows(ht)); assertEquals(numLogLines, TEST_UTIL.countRows(ht));
} finally {
if (ht != null) ht.close();
if (zkw != null) zkw.close();
} }
} }
/** /**
* The original intention of this test was to force an abort of a region * The original intention of this test was to force an abort of a region server and to make sure
* server and to make sure that the failure path in the region servers is * that the failure path in the region servers is properly evaluated. But it is difficult to
* properly evaluated. But it is difficult to ensure that the region server * ensure that the region server doesn't finish the log splitting before it aborts. Also now,
* doesn't finish the log splitting before it aborts. Also now, there is * there is this code path where the master will preempt the region server when master detects
* this code path where the master will preempt the region server when master * that the region server has aborted.
* detects that the region server has aborted.
* @throws Exception * @throws Exception
*/ */
// Was marked flaky before Distributed Log Replay cleanup. // Was marked flaky before Distributed Log Replay cleanup.
@Test (timeout=300000) @Test(timeout = 300000)
public void testWorkerAbort() throws Exception { public void testWorkerAbort() throws Exception {
LOG.info("testWorkerAbort"); LOG.info("testWorkerAbort");
startCluster(3); startCluster(3);
final int NUM_LOG_LINES = 10000; int numLogLines = 10000;
final SplitLogManager slm = master.getMasterWalManager().getSplitLogManager(); SplitLogManager slm = master.getMasterWalManager().getSplitLogManager();
FileSystem fs = master.getMasterFileSystem().getFileSystem(); FileSystem fs = master.getMasterFileSystem().getFileSystem();
final List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads(); List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
HRegionServer hrs = findRSToKill(false); HRegionServer hrs = findRSToKill(false);
Path rootdir = FSUtils.getRootDir(conf); Path rootdir = FSUtils.getRootDir(conf);
final Path logDir = new Path(rootdir, final Path logDir = new Path(rootdir,
AbstractFSWALProvider.getWALDirectoryName(hrs.getServerName().toString())); AbstractFSWALProvider.getWALDirectoryName(hrs.getServerName().toString()));
Table t = installTable(new ZKWatcher(conf, "table-creation", null), 40); try (ZKWatcher zkw = new ZKWatcher(conf, "table-creation", null);
try { Table t = installTable(zkw, 40)) {
makeWAL(hrs, ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices()), NUM_LOG_LINES, 100); makeWAL(hrs, ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices()), numLogLines, 100);
new Thread() { new Thread() {
@Override @Override
public void run() { public void run() {
waitForCounter(tot_wkr_task_acquired, 0, 1, 1000); try {
waitForCounter(tot_wkr_task_acquired, 0, 1, 1000);
} catch (InterruptedException e) {
}
for (RegionServerThread rst : rsts) { for (RegionServerThread rst : rsts) {
rst.getRegionServer().abort("testing"); rst.getRegionServer().abort("testing");
break; break;
} }
} }
}.start(); }.start();
// slm.splitLogDistributed(logDir);
FileStatus[] logfiles = fs.listStatus(logDir); FileStatus[] logfiles = fs.listStatus(logDir);
TaskBatch batch = new TaskBatch(); TaskBatch batch = new TaskBatch();
slm.enqueueSplitTask(logfiles[0].getPath().toString(), batch); slm.enqueueSplitTask(logfiles[0].getPath().toString(), batch);
//waitForCounter but for one of the 2 counters // waitForCounter but for one of the 2 counters
long curt = System.currentTimeMillis(); long curt = System.currentTimeMillis();
long waitTime = 80000; long waitTime = 80000;
long endt = curt + waitTime; long endt = curt + waitTime;
@ -381,7 +353,7 @@ public class TestDistributedLogSplitting {
if ((tot_wkr_task_resigned.sum() + tot_wkr_task_err.sum() + if ((tot_wkr_task_resigned.sum() + tot_wkr_task_err.sum() +
tot_wkr_final_transition_failed.sum() + tot_wkr_task_done.sum() + tot_wkr_final_transition_failed.sum() + tot_wkr_task_done.sum() +
tot_wkr_preempt_task.sum()) == 0) { tot_wkr_preempt_task.sum()) == 0) {
Thread.yield(); Thread.sleep(100);
curt = System.currentTimeMillis(); curt = System.currentTimeMillis();
} else { } else {
assertTrue(1 <= (tot_wkr_task_resigned.sum() + tot_wkr_task_err.sum() + assertTrue(1 <= (tot_wkr_task_resigned.sum() + tot_wkr_task_err.sum() +
@ -390,29 +362,23 @@ public class TestDistributedLogSplitting {
return; return;
} }
} }
fail("none of the following counters went up in " + waitTime + fail("none of the following counters went up in " + waitTime + " milliseconds - " +
" milliseconds - " +
"tot_wkr_task_resigned, tot_wkr_task_err, " + "tot_wkr_task_resigned, tot_wkr_task_err, " +
"tot_wkr_final_transition_failed, tot_wkr_task_done, " + "tot_wkr_final_transition_failed, tot_wkr_task_done, " + "tot_wkr_preempt_task");
"tot_wkr_preempt_task");
} finally {
if (t != null) t.close();
} }
} }
@Test (timeout=300000) @Test(timeout = 300000)
public void testThreeRSAbort() throws Exception { public void testThreeRSAbort() throws Exception {
LOG.info("testThreeRSAbort"); LOG.info("testThreeRSAbort");
final int NUM_REGIONS_TO_CREATE = 40; int numRegionsToCreate = 40;
final int NUM_ROWS_PER_REGION = 100; int numRowsPerRegion = 100;
startCluster(NUM_RS); // NUM_RS=6. startCluster(NUM_RS); // NUM_RS=6.
final ZKWatcher zkw = new ZKWatcher(conf, "distributed log splitting test", null); try (ZKWatcher zkw = new ZKWatcher(conf, "distributed log splitting test", null);
Table table = installTable(zkw, numRegionsToCreate)) {
Table table = installTable(zkw, NUM_REGIONS_TO_CREATE); populateDataInTable(numRowsPerRegion);
try {
populateDataInTable(NUM_ROWS_PER_REGION);
List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads(); List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
assertEquals(NUM_RS, rsts.size()); assertEquals(NUM_RS, rsts.size());
@ -420,22 +386,24 @@ public class TestDistributedLogSplitting {
cluster.killRegionServer(rsts.get(1).getRegionServer().getServerName()); cluster.killRegionServer(rsts.get(1).getRegionServer().getServerName());
cluster.killRegionServer(rsts.get(2).getRegionServer().getServerName()); cluster.killRegionServer(rsts.get(2).getRegionServer().getServerName());
long start = EnvironmentEdgeManager.currentTime(); TEST_UTIL.waitFor(60000, new Waiter.ExplainingPredicate<Exception>() {
while (cluster.getLiveRegionServerThreads().size() > (NUM_RS - 3)) {
if (EnvironmentEdgeManager.currentTime() - start > 60000) { @Override
fail("Timed out waiting for server aborts."); public boolean evaluate() throws Exception {
return cluster.getLiveRegionServerThreads().size() <= NUM_RS - 3;
} }
Thread.sleep(200);
} @Override
public String explainFailure() throws Exception {
return "Timed out waiting for server aborts.";
}
});
TEST_UTIL.waitUntilAllRegionsAssigned(tableName); TEST_UTIL.waitUntilAllRegionsAssigned(tableName);
assertEquals(NUM_REGIONS_TO_CREATE * NUM_ROWS_PER_REGION, TEST_UTIL.countRows(table)); assertEquals(numRegionsToCreate * numRowsPerRegion, TEST_UTIL.countRows(table));
} finally {
if (table != null) table.close();
if (zkw != null) zkw.close();
} }
} }
@Test(timeout=30000) @Test(timeout = 30000)
public void testDelayedDeleteOnFailure() throws Exception { public void testDelayedDeleteOnFailure() throws Exception {
LOG.info("testDelayedDeleteOnFailure"); LOG.info("testDelayedDeleteOnFailure");
startCluster(1); startCluster(1);
@ -511,12 +479,13 @@ public class TestDistributedLogSplitting {
Table ht = installTable(zkw, 10); Table ht = installTable(zkw, 10);
try { try {
FileSystem fs = master.getMasterFileSystem().getFileSystem(); FileSystem fs = master.getMasterFileSystem().getFileSystem();
Path tableDir = FSUtils.getTableDir(FSUtils.getRootDir(conf), TableName.valueOf(name.getMethodName())); Path tableDir =
FSUtils.getTableDir(FSUtils.getRootDir(conf), TableName.valueOf(name.getMethodName()));
List<Path> regionDirs = FSUtils.getRegionDirs(fs, tableDir); List<Path> regionDirs = FSUtils.getRegionDirs(fs, tableDir);
long newSeqId = WALSplitter.writeRegionSequenceIdFile(fs, regionDirs.get(0), 1L, 1000L); long newSeqId = WALSplitter.writeRegionSequenceIdFile(fs, regionDirs.get(0), 1L, 1000L);
WALSplitter.writeRegionSequenceIdFile(fs, regionDirs.get(0) , 1L, 1000L); WALSplitter.writeRegionSequenceIdFile(fs, regionDirs.get(0), 1L, 1000L);
assertEquals(newSeqId + 2000, assertEquals(newSeqId + 2000,
WALSplitter.writeRegionSequenceIdFile(fs, regionDirs.get(0), 3L, 1000L)); WALSplitter.writeRegionSequenceIdFile(fs, regionDirs.get(0), 3L, 1000L));
Path editsdir = WALSplitter.getRegionDirRecoveredEditsDir(regionDirs.get(0)); Path editsdir = WALSplitter.getRegionDirRecoveredEditsDir(regionDirs.get(0));
FileStatus[] files = FSUtils.listStatus(fs, editsdir, new PathFilter() { FileStatus[] files = FSUtils.listStatus(fs, editsdir, new PathFilter() {
@ -538,13 +507,13 @@ public class TestDistributedLogSplitting {
} }
} }
Table installTable(ZKWatcher zkw, int nrs) throws Exception { private Table installTable(ZKWatcher zkw, int nrs) throws Exception {
return installTable(zkw, nrs, 0); return installTable(zkw, nrs, 0);
} }
Table installTable(ZKWatcher zkw, int nrs, int existingRegions) throws Exception { private Table installTable(ZKWatcher zkw, int nrs, int existingRegions) throws Exception {
// Create a table with regions // Create a table with regions
byte [] family = Bytes.toBytes("family"); byte[] family = Bytes.toBytes("family");
LOG.info("Creating table with " + nrs + " regions"); LOG.info("Creating table with " + nrs + " regions");
Table table = TEST_UTIL.createMultiRegionTable(tableName, family, nrs); Table table = TEST_UTIL.createMultiRegionTable(tableName, family, nrs);
int numRegions = -1; int numRegions = -1;
@ -588,8 +557,8 @@ public class TestDistributedLogSplitting {
if (hri.getTable().isSystemTable()) { if (hri.getTable().isSystemTable()) {
continue; continue;
} }
LOG.debug("adding data to rs = " + rst.getName() + LOG.debug(
" region = "+ hri.getRegionNameAsString()); "adding data to rs = " + rst.getName() + " region = " + hri.getRegionNameAsString());
Region region = hrs.getOnlineRegion(hri.getRegionName()); Region region = hrs.getOnlineRegion(hri.getRegionName());
assertTrue(region != null); assertTrue(region != null);
putData(region, hri.getStartKey(), nrows, Bytes.toBytes("q"), COLUMN_FAMILY); putData(region, hri.getStartKey(), nrows, Bytes.toBytes("q"), COLUMN_FAMILY);
@ -609,8 +578,8 @@ public class TestDistributedLogSplitting {
if (hri.getTable().isSystemTable()) { if (hri.getTable().isSystemTable()) {
continue; continue;
} }
LOG.debug("adding data to rs = " + mt.getName() + LOG.debug(
" region = "+ hri.getRegionNameAsString()); "adding data to rs = " + mt.getName() + " region = " + hri.getRegionNameAsString());
Region region = hrs.getOnlineRegion(hri.getRegionName()); Region region = hrs.getOnlineRegion(hri.getRegionName());
assertTrue(region != null); assertTrue(region != null);
putData(region, hri.getStartKey(), nrows, Bytes.toBytes("q"), COLUMN_FAMILY); putData(region, hri.getStartKey(), nrows, Bytes.toBytes("q"), COLUMN_FAMILY);
@ -623,20 +592,18 @@ public class TestDistributedLogSplitting {
makeWAL(hrs, regions, num_edits, edit_size, true); makeWAL(hrs, regions, num_edits, edit_size, true);
} }
public void makeWAL(HRegionServer hrs, List<RegionInfo> regions, public void makeWAL(HRegionServer hrs, List<RegionInfo> regions, int numEdits, int editSize,
int num_edits, int edit_size, boolean cleanShutdown) throws IOException { boolean cleanShutdown) throws IOException {
// remove root and meta region // remove root and meta region
regions.remove(RegionInfoBuilder.FIRST_META_REGIONINFO); regions.remove(RegionInfoBuilder.FIRST_META_REGIONINFO);
for(Iterator<RegionInfo> iter = regions.iterator(); iter.hasNext(); ) { for (Iterator<RegionInfo> iter = regions.iterator(); iter.hasNext();) {
RegionInfo regionInfo = iter.next(); RegionInfo regionInfo = iter.next();
if(regionInfo.getTable().isSystemTable()) { if (regionInfo.getTable().isSystemTable()) {
iter.remove(); iter.remove();
} }
} }
HTableDescriptor htd = new HTableDescriptor(tableName); byte[] value = new byte[editSize];
htd.addFamily(new HColumnDescriptor(COLUMN_FAMILY));
byte[] value = new byte[edit_size];
List<RegionInfo> hris = new ArrayList<>(); List<RegionInfo> hris = new ArrayList<>();
for (RegionInfo region : regions) { for (RegionInfo region : regions) {
@ -646,19 +613,19 @@ public class TestDistributedLogSplitting {
hris.add(region); hris.add(region);
} }
LOG.info("Creating wal edits across " + hris.size() + " regions."); LOG.info("Creating wal edits across " + hris.size() + " regions.");
for (int i = 0; i < edit_size; i++) { for (int i = 0; i < editSize; i++) {
value[i] = (byte) ('a' + (i % 26)); value[i] = (byte) ('a' + (i % 26));
} }
int n = hris.size(); int n = hris.size();
int[] counts = new int[n]; int[] counts = new int[n];
// sync every ~30k to line up with desired wal rolls // sync every ~30k to line up with desired wal rolls
final int syncEvery = 30 * 1024 / edit_size; final int syncEvery = 30 * 1024 / editSize;
MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
if (n > 0) { if (n > 0) {
for (int i = 0; i < num_edits; i += 1) { for (int i = 0; i < numEdits; i += 1) {
WALEdit e = new WALEdit(); WALEdit e = new WALEdit();
RegionInfo curRegionInfo = hris.get(i % n); RegionInfo curRegionInfo = hris.get(i % n);
final WAL log = hrs.getWAL(curRegionInfo); WAL log = hrs.getWAL(curRegionInfo);
byte[] startRow = curRegionInfo.getStartKey(); byte[] startRow = curRegionInfo.getStartKey();
if (startRow == null || startRow.length == 0) { if (startRow == null || startRow.length == 0) {
startRow = new byte[] { 0, 0, 0, 0, 1 }; startRow = new byte[] { 0, 0, 0, 0, 1 };
@ -668,9 +635,9 @@ public class TestDistributedLogSplitting {
// HBaseTestingUtility.createMultiRegions use 5 bytes key // HBaseTestingUtility.createMultiRegions use 5 bytes key
byte[] qualifier = Bytes.toBytes("c" + Integer.toString(i)); byte[] qualifier = Bytes.toBytes("c" + Integer.toString(i));
e.add(new KeyValue(row, COLUMN_FAMILY, qualifier, System.currentTimeMillis(), value)); e.add(new KeyValue(row, COLUMN_FAMILY, qualifier, System.currentTimeMillis(), value));
log.append(curRegionInfo, log.append(curRegionInfo, new WALKeyImpl(curRegionInfo.getEncodedNameAsBytes(), tableName,
new WALKeyImpl(curRegionInfo.getEncodedNameAsBytes(), tableName, System.currentTimeMillis(), mvcc),
System.currentTimeMillis(), mvcc), e, true); e, true);
if (0 == i % syncEvery) { if (0 == i % syncEvery) {
log.sync(); log.sync();
} }
@ -680,12 +647,12 @@ public class TestDistributedLogSplitting {
// done as two passes because the regions might share logs. shutdown is idempotent, but sync // done as two passes because the regions might share logs. shutdown is idempotent, but sync
// will cause errors if done after. // will cause errors if done after.
for (RegionInfo info : hris) { for (RegionInfo info : hris) {
final WAL log = hrs.getWAL(info); WAL log = hrs.getWAL(info);
log.sync(); log.sync();
} }
if (cleanShutdown) { if (cleanShutdown) {
for (RegionInfo info : hris) { for (RegionInfo info : hris) {
final WAL log = hrs.getWAL(info); WAL log = hrs.getWAL(info);
log.shutdown(); log.shutdown();
} }
} }
@ -695,24 +662,15 @@ public class TestDistributedLogSplitting {
return; return;
} }
private int countWAL(Path log, FileSystem fs, Configuration conf) private int countWAL(Path log, FileSystem fs, Configuration conf) throws IOException {
throws IOException {
int count = 0; int count = 0;
WAL.Reader in = WALFactory.createReader(fs, log, conf); try (WAL.Reader in = WALFactory.createReader(fs, log, conf)) {
try {
WAL.Entry e; WAL.Entry e;
while ((e = in.next()) != null) { while ((e = in.next()) != null) {
if (!WALEdit.isMetaEditFamily(e.getEdit().getCells().get(0))) { if (!WALEdit.isMetaEditFamily(e.getEdit().getCells().get(0))) {
count++; count++;
} }
} }
} finally {
try {
in.close();
} catch (IOException exception) {
LOG.warn("Problem closing wal: " + exception.getMessage());
LOG.debug("exception details.", exception);
}
} }
return count; return count;
} }
@ -721,32 +679,31 @@ public class TestDistributedLogSplitting {
TEST_UTIL.waitUntilNoRegionsInTransition(60000); TEST_UTIL.waitUntilNoRegionsInTransition(60000);
} }
private void putData(Region region, byte[] startRow, int numRows, byte [] qf, private void putData(Region region, byte[] startRow, int numRows, byte[] qf, byte[]... families)
byte [] ...families)
throws IOException { throws IOException {
for(int i = 0; i < numRows; i++) { for (int i = 0; i < numRows; i++) {
Put put = new Put(Bytes.add(startRow, Bytes.toBytes(i))); Put put = new Put(Bytes.add(startRow, Bytes.toBytes(i)));
for(byte [] family : families) { for (byte[] family : families) {
put.addColumn(family, qf, null); put.addColumn(family, qf, null);
} }
region.put(put); region.put(put);
} }
} }
private void waitForCounter(LongAdder ctr, long oldval, long newval, private void waitForCounter(LongAdder ctr, long oldval, long newval, long timems)
long timems) { throws InterruptedException {
long curt = System.currentTimeMillis(); long curt = System.currentTimeMillis();
long endt = curt + timems; long endt = curt + timems;
while (curt < endt) { while (curt < endt) {
if (ctr.sum() == oldval) { if (ctr.sum() == oldval) {
Thread.yield(); Thread.sleep(100);
curt = System.currentTimeMillis(); curt = System.currentTimeMillis();
} else { } else {
assertEquals(newval, ctr.sum()); assertEquals(newval, ctr.sum());
return; return;
} }
} }
assertTrue(false); fail();
} }
private void abortMaster(MiniHBaseCluster cluster) throws InterruptedException { private void abortMaster(MiniHBaseCluster cluster) throws InterruptedException {
@ -769,7 +726,7 @@ public class TestDistributedLogSplitting {
List<RegionInfo> regions = null; List<RegionInfo> regions = null;
HRegionServer hrs = null; HRegionServer hrs = null;
for (RegionServerThread rst: rsts) { for (RegionServerThread rst : rsts) {
hrs = rst.getRegionServer(); hrs = rst.getRegionServer();
while (rst.isAlive() && !hrs.isOnline()) { while (rst.isAlive() && !hrs.isOnline()) {
Thread.sleep(100); Thread.sleep(100);
@ -794,14 +751,14 @@ public class TestDistributedLogSplitting {
if (isCarryingMeta && hasMetaRegion) { if (isCarryingMeta && hasMetaRegion) {
// clients ask for a RS with META // clients ask for a RS with META
if (!foundTableRegion) { if (!foundTableRegion) {
final HRegionServer destRS = hrs; HRegionServer destRS = hrs;
// the RS doesn't have regions of the specified table so we need move one to this RS // the RS doesn't have regions of the specified table so we need move one to this RS
List<RegionInfo> tableRegions = TEST_UTIL.getAdmin().getRegions(tableName); List<RegionInfo> tableRegions = TEST_UTIL.getAdmin().getRegions(tableName);
final RegionInfo hri = tableRegions.get(0); RegionInfo hri = tableRegions.get(0);
TEST_UTIL.getAdmin().move(hri.getEncodedNameAsBytes(), TEST_UTIL.getAdmin().move(hri.getEncodedNameAsBytes(),
Bytes.toBytes(destRS.getServerName().getServerName())); Bytes.toBytes(destRS.getServerName().getServerName()));
// wait for region move completes // wait for region move completes
final RegionStates regionStates = RegionStates regionStates =
TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates(); TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates();
TEST_UTIL.waitFor(45000, 200, new Waiter.Predicate<Exception>() { TEST_UTIL.waitFor(45000, 200, new Waiter.Predicate<Exception>() {
@Override @Override
@ -815,7 +772,9 @@ public class TestDistributedLogSplitting {
} else if (hasMetaRegion || isCarryingMeta) { } else if (hasMetaRegion || isCarryingMeta) {
continue; continue;
} }
if (foundTableRegion) break; if (foundTableRegion) {
break;
}
} }
return hrs; return hrs;

View File

@ -0,0 +1,31 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.junit.experimental.categories.Category;
@Category({ MasterTests.class, LargeTests.class })
public class TestDLSAsyncFSWAL extends AbstractTestDLS {
@Override
protected String getWalProvider() {
return "asyncfs";
}
}

View File

@ -0,0 +1,31 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.junit.experimental.categories.Category;
@Category({ MasterTests.class, LargeTests.class })
public class TestDLSFSHLog extends AbstractTestDLS {
@Override
protected String getWalProvider() {
return "filesystem";
}
}