HBASE-4282 RegionServer should abort when WAL close fails with unflushed edits
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1183180 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
fc5c533c59
commit
8313263677
|
@ -133,6 +133,7 @@ public class HLog implements Syncable {
|
||||||
private final String prefix;
|
private final String prefix;
|
||||||
private final AtomicLong unflushedEntries = new AtomicLong(0);
|
private final AtomicLong unflushedEntries = new AtomicLong(0);
|
||||||
private volatile long syncedTillHere = 0;
|
private volatile long syncedTillHere = 0;
|
||||||
|
private long lastDeferredTxid;
|
||||||
private final Path oldLogDir;
|
private final Path oldLogDir;
|
||||||
private boolean logRollRunning;
|
private boolean logRollRunning;
|
||||||
|
|
||||||
|
@ -196,6 +197,7 @@ public class HLog implements Syncable {
|
||||||
|
|
||||||
//number of transactions in the current Hlog.
|
//number of transactions in the current Hlog.
|
||||||
private final AtomicInteger numEntries = new AtomicInteger(0);
|
private final AtomicInteger numEntries = new AtomicInteger(0);
|
||||||
|
|
||||||
// If live datanode count is lower than the default replicas value,
|
// If live datanode count is lower than the default replicas value,
|
||||||
// RollWriter will be triggered in each sync(So the RollWriter will be
|
// RollWriter will be triggered in each sync(So the RollWriter will be
|
||||||
// triggered one by one in a short time). Using it as a workaround to slow
|
// triggered one by one in a short time). Using it as a workaround to slow
|
||||||
|
@ -813,9 +815,12 @@ public class HLog implements Syncable {
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.error("Failed close of HLog writer", e);
|
LOG.error("Failed close of HLog writer", e);
|
||||||
int errors = closeErrorCount.incrementAndGet();
|
int errors = closeErrorCount.incrementAndGet();
|
||||||
if (errors <= closeErrorsTolerated) {
|
if (errors <= closeErrorsTolerated && !hasDeferredEntries()) {
|
||||||
LOG.warn("Riding over HLog close failure! error count="+errors);
|
LOG.warn("Riding over HLog close failure! error count="+errors);
|
||||||
} else {
|
} else {
|
||||||
|
if (hasDeferredEntries()) {
|
||||||
|
LOG.error("Aborting due to unflushed edits in HLog");
|
||||||
|
}
|
||||||
// Failed close of log file. Means we're losing edits. For now,
|
// Failed close of log file. Means we're losing edits. For now,
|
||||||
// shut ourselves down to minimize loss. Alternative is to try and
|
// shut ourselves down to minimize loss. Alternative is to try and
|
||||||
// keep going. See HBASE-930.
|
// keep going. See HBASE-930.
|
||||||
|
@ -990,6 +995,9 @@ public class HLog implements Syncable {
|
||||||
doWrite(regionInfo, logKey, logEdit, htd);
|
doWrite(regionInfo, logKey, logEdit, htd);
|
||||||
txid = this.unflushedEntries.incrementAndGet();
|
txid = this.unflushedEntries.incrementAndGet();
|
||||||
this.numEntries.incrementAndGet();
|
this.numEntries.incrementAndGet();
|
||||||
|
if (htd.isDeferredLogFlush()) {
|
||||||
|
lastDeferredTxid = txid;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Sync if catalog region, and if not then check if that table supports
|
// Sync if catalog region, and if not then check if that table supports
|
||||||
|
@ -1068,6 +1076,9 @@ public class HLog implements Syncable {
|
||||||
doWrite(info, logKey, edits, htd);
|
doWrite(info, logKey, edits, htd);
|
||||||
this.numEntries.incrementAndGet();
|
this.numEntries.incrementAndGet();
|
||||||
txid = this.unflushedEntries.incrementAndGet();
|
txid = this.unflushedEntries.incrementAndGet();
|
||||||
|
if (htd.isDeferredLogFlush()) {
|
||||||
|
lastDeferredTxid = txid;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
// Sync if catalog region, and if not then check if that table supports
|
// Sync if catalog region, and if not then check if that table supports
|
||||||
// deferred log flushing
|
// deferred log flushing
|
||||||
|
@ -1789,6 +1800,11 @@ public class HLog implements Syncable {
|
||||||
return coprocessorHost;
|
return coprocessorHost;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** Provide access to currently deferred sequence num for tests */
|
||||||
|
boolean hasDeferredEntries() {
|
||||||
|
return lastDeferredTxid > syncedTillHere;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Pass one or more log file names and it will either dump out a text version
|
* Pass one or more log file names and it will either dump out a text version
|
||||||
* on <code>stdout</code> or split the specified log files.
|
* on <code>stdout</code> or split the specified log files.
|
||||||
|
|
|
@ -0,0 +1,152 @@
|
||||||
|
package org.apache.hadoop.hbase.regionserver.wal;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.commons.logging.impl.Log4JLogger;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.MiniHBaseCluster;
|
||||||
|
import org.apache.hadoop.hbase.client.HBaseAdmin;
|
||||||
|
import org.apache.hadoop.hbase.client.HTable;
|
||||||
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.util.FSUtils;
|
||||||
|
import org.apache.hadoop.hdfs.DFSClient;
|
||||||
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
|
||||||
|
import org.apache.log4j.Level;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests for conditions that should trigger RegionServer aborts when
|
||||||
|
* rolling the current HLog fails.
|
||||||
|
*/
|
||||||
|
public class TestLogRollAbort {
|
||||||
|
private static final Log LOG = LogFactory.getLog(TestLogRolling.class);
|
||||||
|
private static MiniDFSCluster dfsCluster;
|
||||||
|
private static HBaseAdmin admin;
|
||||||
|
private static MiniHBaseCluster cluster;
|
||||||
|
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||||
|
|
||||||
|
// verbose logging on classes that are touched in these tests
|
||||||
|
{
|
||||||
|
((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
|
||||||
|
((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL);
|
||||||
|
((Log4JLogger)LogFactory.getLog("org.apache.hadoop.hdfs.server.namenode.FSNamesystem"))
|
||||||
|
.getLogger().setLevel(Level.ALL);
|
||||||
|
((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL);
|
||||||
|
((Log4JLogger)HRegionServer.LOG).getLogger().setLevel(Level.ALL);
|
||||||
|
((Log4JLogger)HRegion.LOG).getLogger().setLevel(Level.ALL);
|
||||||
|
((Log4JLogger)HLog.LOG).getLogger().setLevel(Level.ALL);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Need to override this setup so we can edit the config before it gets sent
|
||||||
|
// to the HDFS & HBase cluster startup.
|
||||||
|
@BeforeClass
|
||||||
|
public static void setUpBeforeClass() throws Exception {
|
||||||
|
// Tweak default timeout values down for faster recovery
|
||||||
|
TEST_UTIL.getConfiguration().setInt(
|
||||||
|
"hbase.regionserver.logroll.errors.tolerated", 2);
|
||||||
|
TEST_UTIL.getConfiguration().setInt("ipc.ping.interval", 10 * 1000);
|
||||||
|
TEST_UTIL.getConfiguration().setInt("ipc.socket.timeout", 10 * 1000);
|
||||||
|
TEST_UTIL.getConfiguration().setInt("hbase.rpc.timeout", 10 * 1000);
|
||||||
|
|
||||||
|
// Increase the amount of time between client retries
|
||||||
|
TEST_UTIL.getConfiguration().setLong("hbase.client.pause", 5 * 1000);
|
||||||
|
|
||||||
|
// make sure log.hflush() calls syncFs() to open a pipeline
|
||||||
|
TEST_UTIL.getConfiguration().setBoolean("dfs.support.append", true);
|
||||||
|
// lower the namenode & datanode heartbeat so the namenode
|
||||||
|
// quickly detects datanode failures
|
||||||
|
TEST_UTIL.getConfiguration().setInt("heartbeat.recheck.interval", 5000);
|
||||||
|
TEST_UTIL.getConfiguration().setInt("dfs.heartbeat.interval", 1);
|
||||||
|
// the namenode might still try to choose the recently-dead datanode
|
||||||
|
// for a pipeline, so try to a new pipeline multiple times
|
||||||
|
TEST_UTIL.getConfiguration().setInt("dfs.client.block.write.retries", 10);
|
||||||
|
// set periodic sync to 2 min so it doesn't run during test
|
||||||
|
TEST_UTIL.getConfiguration().setInt("hbase.regionserver.optionallogflushinterval",
|
||||||
|
120 * 1000);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
TEST_UTIL.startMiniCluster(2);
|
||||||
|
|
||||||
|
cluster = TEST_UTIL.getHBaseCluster();
|
||||||
|
dfsCluster = TEST_UTIL.getDFSCluster();
|
||||||
|
admin = TEST_UTIL.getHBaseAdmin();
|
||||||
|
|
||||||
|
// disable region rebalancing (interferes with log watching)
|
||||||
|
cluster.getMaster().balanceSwitch(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() throws IOException {
|
||||||
|
TEST_UTIL.shutdownMiniCluster();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests that RegionServer aborts if we hit an error closing the WAL when
|
||||||
|
* there are unsynced WAL edits. See HBASE-4282.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testRSAbortWithUnflushedEdits() throws Exception {
|
||||||
|
LOG.info("Starting testRSAbortWithUnflushedEdits()");
|
||||||
|
|
||||||
|
// When the META table can be opened, the region servers are running
|
||||||
|
new HTable(TEST_UTIL.getConfiguration(), HConstants.META_TABLE_NAME);
|
||||||
|
|
||||||
|
// Create the test table and open it
|
||||||
|
String tableName = this.getClass().getSimpleName();
|
||||||
|
HTableDescriptor desc = new HTableDescriptor(tableName);
|
||||||
|
desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY));
|
||||||
|
desc.setDeferredLogFlush(true);
|
||||||
|
|
||||||
|
admin.createTable(desc);
|
||||||
|
HTable table = new HTable(TEST_UTIL.getConfiguration(), tableName);
|
||||||
|
|
||||||
|
HRegionServer server = TEST_UTIL.getRSForFirstRegionInTable(Bytes.toBytes(tableName));
|
||||||
|
HLog log = server.getWAL();
|
||||||
|
|
||||||
|
assertTrue("Need HDFS-826 for this test", log.canGetCurReplicas());
|
||||||
|
// don't run this test without append support (HDFS-200 & HDFS-142)
|
||||||
|
assertTrue("Need append support for this test",
|
||||||
|
FSUtils.isAppendSupported(TEST_UTIL.getConfiguration()));
|
||||||
|
|
||||||
|
Put p = new Put(Bytes.toBytes("row2001"));
|
||||||
|
p.add(HConstants.CATALOG_FAMILY, Bytes.toBytes("col"), Bytes.toBytes(2001));
|
||||||
|
table.put(p);
|
||||||
|
|
||||||
|
log.sync();
|
||||||
|
|
||||||
|
p = new Put(Bytes.toBytes("row2002"));
|
||||||
|
p.add(HConstants.CATALOG_FAMILY, Bytes.toBytes("col"), Bytes.toBytes(2002));
|
||||||
|
table.put(p);
|
||||||
|
|
||||||
|
dfsCluster.restartDataNodes();
|
||||||
|
LOG.info("Restarted datanodes");
|
||||||
|
|
||||||
|
assertTrue("Should have an outstanding WAL edit", log.hasDeferredEntries());
|
||||||
|
try {
|
||||||
|
log.rollWriter(true);
|
||||||
|
fail("Log roll should have triggered FailedLogCloseException");
|
||||||
|
} catch (FailedLogCloseException flce) {
|
||||||
|
assertTrue("Should have deferred flush log edits outstanding",
|
||||||
|
log.hasDeferredEntries());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -56,6 +56,7 @@ import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.FSUtils;
|
import org.apache.hadoop.hbase.util.FSUtils;
|
||||||
|
import org.apache.hadoop.hbase.util.JVMClusterUtil;
|
||||||
import org.apache.hadoop.hdfs.DFSClient;
|
import org.apache.hadoop.hdfs.DFSClient;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
|
@ -462,11 +463,10 @@ public class TestLogRolling {
|
||||||
|
|
||||||
// roll all datanodes in the pipeline
|
// roll all datanodes in the pipeline
|
||||||
dfsCluster.restartDataNodes();
|
dfsCluster.restartDataNodes();
|
||||||
Thread.sleep(10000);
|
Thread.sleep(1000);
|
||||||
dfsCluster.waitActive();
|
dfsCluster.waitActive();
|
||||||
LOG.info("Data Nodes restarted");
|
LOG.info("Data Nodes restarted");
|
||||||
|
|
||||||
//this.log.sync();
|
|
||||||
// this write should succeed, but trigger a log roll
|
// this write should succeed, but trigger a log roll
|
||||||
writeData(table, 1003);
|
writeData(table, 1003);
|
||||||
long newFilenum = log.getFilenum();
|
long newFilenum = log.getFilenum();
|
||||||
|
@ -474,12 +474,11 @@ public class TestLogRolling {
|
||||||
assertTrue("Missing datanode should've triggered a log roll",
|
assertTrue("Missing datanode should've triggered a log roll",
|
||||||
newFilenum > oldFilenum && newFilenum > curTime);
|
newFilenum > oldFilenum && newFilenum > curTime);
|
||||||
|
|
||||||
//this.log.sync();
|
|
||||||
writeData(table, 1004);
|
writeData(table, 1004);
|
||||||
|
|
||||||
// roll all datanode again
|
// roll all datanode again
|
||||||
dfsCluster.restartDataNodes();
|
dfsCluster.restartDataNodes();
|
||||||
Thread.sleep(10000);
|
Thread.sleep(1000);
|
||||||
dfsCluster.waitActive();
|
dfsCluster.waitActive();
|
||||||
LOG.info("Data Nodes restarted");
|
LOG.info("Data Nodes restarted");
|
||||||
|
|
||||||
|
@ -536,5 +535,11 @@ public class TestLogRolling {
|
||||||
} finally {
|
} finally {
|
||||||
scanner.close();
|
scanner.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// verify that no region servers aborted
|
||||||
|
for (JVMClusterUtil.RegionServerThread rsThread:
|
||||||
|
TEST_UTIL.getHBaseCluster().getRegionServerThreads()) {
|
||||||
|
assertFalse(rsThread.getRegionServer().isAborted());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue