HBASE-4222 Allow HLog to retry log roll on transient write pipeline errors

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1161320 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Helmling 2011-08-24 23:41:07 +00:00
parent b0782cbfc0
commit 6f00842284
5 changed files with 239 additions and 23 deletions

View File

@ -401,6 +401,7 @@ Release 0.91.0 - Unreleased
HBASE-4199 blockCache summary - backend (Doug Meil)
HBASE-4240 Allow Loadbalancer to be pluggable
HBASE-4244 Refactor bin/hbase help
HBASE-4222 Make HLog more resilient to write pipeline failures
TASKS
HBASE-3559 Move report of split to master OFF the heartbeat channel

View File

@ -83,12 +83,14 @@ class LogRoller extends Thread implements WALActionsListener {
if (LOG.isDebugEnabled()) {
LOG.debug("Hlog roll period " + this.rollperiod + "ms elapsed");
}
} else if (LOG.isDebugEnabled()) {
LOG.debug("HLog roll manually triggered");
}
rollLock.lock(); // FindBugs UL_UNRELEASED_LOCK_EXCEPTION_PATH
try {
this.lastrolltime = now;
// This is array of actual region names.
byte [][] regionsToFlush = this.services.getWAL().rollWriter();
byte [][] regionsToFlush = this.services.getWAL().rollWriter(rollLog.get());
if (regionsToFlush != null) {
for (byte [] r: regionsToFlush) scheduleFlush(r);
}

View File

@ -231,6 +231,11 @@ public class HLog implements Syncable {
*/
private final LogSyncer logSyncerThread;
/** Number of log close errors tolerated before we abort */
private final int closeErrorsTolerated;
private final AtomicInteger closeErrorCount = new AtomicInteger();
/**
* Pattern used to validate a HLog file name
*/
@ -376,6 +381,9 @@ public class HLog implements Syncable {
this.lowReplicationRollLimit = conf.getInt(
"hbase.regionserver.hlog.lowreplication.rolllimit", 5);
this.enabled = conf.getBoolean("hbase.regionserver.hlog.enabled", true);
this.closeErrorsTolerated = conf.getInt(
"hbase.regionserver.logroll.errors.tolerated", 0);
LOG.info("HLog configuration: blocksize=" +
StringUtils.byteDesc(this.blocksize) +
", rollsize=" + StringUtils.byteDesc(this.logrollsize) +
@ -497,8 +505,35 @@ public class HLog implements Syncable {
* @throws IOException
*/
public byte [][] rollWriter() throws FailedLogCloseException, IOException {
return rollWriter(false);
}
/**
* Roll the log writer. That is, start writing log messages to a new file.
*
* Because a log cannot be rolled during a cache flush, and a cache flush
* spans two method calls, a special lock needs to be obtained so that a cache
* flush cannot start when the log is being rolled and the log cannot be
* rolled during a cache flush.
*
* <p>Note that this method cannot be synchronized because it is possible that
* startCacheFlush runs, obtaining the cacheFlushLock, then this method could
* start which would obtain the lock on this but block on obtaining the
* cacheFlushLock and then completeCacheFlush could be called which would wait
* for the lock on this and consequently never release the cacheFlushLock
*
* @param force If true, force creation of a new writer even if no entries
* have been written to the current writer
* @return If lots of logs, flush the returned regions so next time through
* we can clean logs. Returns null if nothing to flush. Names are actual
* region names as returned by {@link HRegionInfo#getEncodedName()}
* @throws org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException
* @throws IOException
*/
public byte [][] rollWriter(boolean force)
throws FailedLogCloseException, IOException {
// Return if nothing to flush.
if (this.writer != null && this.numEntries.get() <= 0) {
if (!force && this.writer != null && this.numEntries.get() <= 0) {
return null;
}
byte [][] regionsToFlush = null;
@ -506,6 +541,7 @@ public class HLog implements Syncable {
this.logRollRunning = true;
try {
if (closed) {
LOG.debug("HLog closed. Skipping rolling of writer");
return regionsToFlush;
}
// Do all the preparation outside of the updateLock to block
@ -513,6 +549,9 @@ public class HLog implements Syncable {
long currentFilenum = this.filenum;
this.filenum = System.currentTimeMillis();
Path newPath = computeFilename();
if (LOG.isDebugEnabled()) {
LOG.debug("Enabling new writer for "+FSUtils.getPath(newPath));
}
HLog.Writer nextWriter = this.createWriterInstance(fs, newPath, conf);
// Can we get at the dfsclient outputstream? If an instance of
// SFLW, it'll have done the necessary reflection to get at the
@ -745,14 +784,21 @@ public class HLog implements Syncable {
// Close the current writer, get a new one.
try {
this.writer.close();
closeErrorCount.set(0);
} catch (IOException e) {
// Failed close of log file. Means we're losing edits. For now,
// shut ourselves down to minimize loss. Alternative is to try and
// keep going. See HBASE-930.
FailedLogCloseException flce =
new FailedLogCloseException("#" + currentfilenum);
flce.initCause(e);
throw e;
LOG.error("Failed close of HLog writer", e);
int errors = closeErrorCount.incrementAndGet();
if (errors <= closeErrorsTolerated) {
LOG.warn("Riding over HLog close failure! error count="+errors);
} else {
// Failed close of log file. Means we're losing edits. For now,
// shut ourselves down to minimize loss. Alternative is to try and
// keep going. See HBASE-930.
FailedLogCloseException flce =
new FailedLogCloseException("#" + currentfilenum);
flce.initCause(e);
throw flce;
}
}
if (currentfilenum >= 0) {
oldFile = computeFilename(currentfilenum);
@ -971,12 +1017,14 @@ public class HLog implements Syncable {
// throw exceptions on interrupt
while(!this.isInterrupted()) {
Thread.sleep(this.optionalFlushInterval);
sync();
try {
Thread.sleep(this.optionalFlushInterval);
sync();
} catch (IOException e) {
LOG.error("Error while syncing, requesting close of hlog ", e);
requestLogRoll();
}
}
} catch (IOException e) {
LOG.error("Error while syncing, requesting close of hlog ", e);
requestLogRoll();
} catch (InterruptedException e) {
LOG.debug(getName() + " interrupted while waiting for sync requests");
} finally {
@ -1007,7 +1055,7 @@ public class HLog implements Syncable {
}
} catch (IOException e) {
LOG.fatal("Could not append. Requesting close of hlog", e);
LOG.fatal("Could not sync. Requesting close of hlog", e);
requestLogRoll();
throw e;
}

View File

@ -199,6 +199,15 @@
<description>Period at which we will roll the commit log regardless
of how many edits it has.</description>
</property>
<property>
<name>hbase.regionserver.logroll.errors.tolerated</name>
<value>2</value>
<description>The number of consecutive WAL close errors we will allow
before triggering a server abort. A setting of 0 will cause the
region server to abort if closing the current WAL writer fails during
log rolling. Even a small value (2 or 3) will allow a region server
to ride over transient HDFS errors.</description>
</property>
<property>
<name>hbase.regionserver.hlog.reader.impl</name>
<value>org.apache.hadoop.hbase.regionserver.wal.SequenceFileLogReader</value>

View File

@ -19,27 +19,37 @@
*/
package org.apache.hadoop.hbase.regionserver.wal;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.*;
import java.io.EOFException;
import java.io.IOException;
import java.io.OutputStream;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.util.Bytes;
@ -51,7 +61,8 @@ import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
import org.apache.log4j.Level;
import org.junit.AfterClass;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
@ -113,6 +124,12 @@ public class TestLogRolling {
// We roll the log after every 32 writes
TEST_UTIL.getConfiguration().setInt("hbase.regionserver.maxlogentries", 32);
TEST_UTIL.getConfiguration().setInt(
"hbase.regionserver.logroll.errors.tolerated", 2);
TEST_UTIL.getConfiguration().setInt("ipc.ping.interval", 10 * 1000);
TEST_UTIL.getConfiguration().setInt("ipc.socket.timeout", 10 * 1000);
TEST_UTIL.getConfiguration().setInt("hbase.rpc.timeout", 10 * 1000);
// For less frequently updated regions flush after every 2 flushes
TEST_UTIL.getConfiguration().setInt("hbase.hregion.memstore.optionalflushcount", 2);
@ -140,6 +157,10 @@ public class TestLogRolling {
"hbase.regionserver.hlog.tolerable.lowreplication", 2);
TEST_UTIL.getConfiguration().setInt(
"hbase.regionserver.hlog.lowreplication.rolllimit", 3);
}
@Before
public void setUp() throws Exception {
TEST_UTIL.startMiniCluster(2);
cluster = TEST_UTIL.getHBaseCluster();
@ -148,8 +169,8 @@ public class TestLogRolling {
admin = TEST_UTIL.getHBaseAdmin();
}
@AfterClass
public static void tearDownAfterClass() throws IOException {
@After
public void tearDown() throws IOException {
TEST_UTIL.cleanupTestDir();
TEST_UTIL.shutdownMiniCluster();
}
@ -300,10 +321,6 @@ public class TestLogRolling {
HTableDescriptor desc = new HTableDescriptor(tableName);
desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY));
if (admin.tableExists(tableName)) {
admin.disableTable(tableName);
admin.deleteTable(tableName);
}
admin.createTable(desc);
HTable table = new HTable(TEST_UTIL.getConfiguration(), tableName);
@ -366,4 +383,143 @@ public class TestLogRolling {
assertTrue("New log file should have the default replication",
log.getLogReplication() == fs.getDefaultReplication());
}
/**
* Test that HLog is rolled when all data nodes in the pipeline have been
* restarted.
* @throws Exception
*/
@Test
public void testLogRollOnPipelineRestart() throws Exception {
assertTrue("This test requires HLog file replication.",
fs.getDefaultReplication() > 1);
LOG.info("Replication=" + fs.getDefaultReplication());
// When the META table can be opened, the region servers are running
new HTable(TEST_UTIL.getConfiguration(), HConstants.META_TABLE_NAME);
this.server = cluster.getRegionServer(0);
this.log = server.getWAL();
// Create the test table and open it
String tableName = getName();
HTableDescriptor desc = new HTableDescriptor(tableName);
desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY));
admin.createTable(desc);
HTable table = new HTable(TEST_UTIL.getConfiguration(), tableName);
server = TEST_UTIL.getRSForFirstRegionInTable(Bytes.toBytes(tableName));
this.log = server.getWAL();
final List<Path> paths = new ArrayList<Path>();
paths.add(log.computeFilename());
log.registerWALActionsListener(new WALActionsListener() {
@Override
public void logRolled(Path newFile) {
paths.add(newFile);
}
@Override
public void logRollRequested() {}
@Override
public void logCloseRequested() {}
@Override
public void visitLogEntryBeforeWrite(HRegionInfo info, HLogKey logKey,
WALEdit logEdit) {}
@Override
public void visitLogEntryBeforeWrite(HTableDescriptor htd, HLogKey logKey,
WALEdit logEdit) {}
});
assertTrue("Need HDFS-826 for this test", log.canGetCurReplicas());
// don't run this test without append support (HDFS-200 & HDFS-142)
assertTrue("Need append support for this test", FSUtils
.isAppendSupported(TEST_UTIL.getConfiguration()));
writeData(table, 2);
table.setAutoFlush(true);
long curTime = System.currentTimeMillis();
long oldFilenum = log.getFilenum();
assertTrue("Log should have a timestamp older than now",
curTime > oldFilenum && oldFilenum != -1);
assertTrue("The log shouldn't have rolled yet", oldFilenum == log.getFilenum());
DatanodeInfo[] pipeline = getPipeline(log);
assertTrue(pipeline.length == fs.getDefaultReplication());
// roll all datanodes in the pipeline
dfsCluster.restartDataNodes();
Thread.sleep(10000);
dfsCluster.waitActive();
LOG.info("Data Nodes restarted");
//this.log.sync();
// this write should succeed, but trigger a log roll
writeData(table, 3);
long newFilenum = log.getFilenum();
assertTrue("Missing datanode should've triggered a log roll",
newFilenum > oldFilenum && newFilenum > curTime);
//this.log.sync();
writeData(table, 4);
// roll all datanode again
dfsCluster.restartDataNodes();
Thread.sleep(10000);
dfsCluster.waitActive();
LOG.info("Data Nodes restarted");
// this write should succeed, but trigger a log roll
writeData(table, 5);
// force a log roll to read back and verify previously written logs
log.rollWriter();
// read back the data written
Set<String> loggedRows = new HashSet<String>();
for (Path p : paths) {
LOG.debug("Reading HLog "+FSUtils.getPath(p));
HLog.Reader reader = null;
try {
reader = HLog.getReader(fs, p, TEST_UTIL.getConfiguration());
HLog.Entry entry;
while ((entry = reader.next()) != null) {
LOG.debug("#"+entry.getKey().getLogSeqNum()+": "+entry.getEdit().getKeyValues());
for (KeyValue kv : entry.getEdit().getKeyValues()) {
loggedRows.add(Bytes.toStringBinary(kv.getRow()));
}
}
} catch (EOFException e) {
LOG.debug("EOF reading file "+FSUtils.getPath(p));
} finally {
if (reader != null) reader.close();
}
}
// verify the written rows are there
assertTrue(loggedRows.contains("row0002"));
assertTrue(loggedRows.contains("row0003"));
assertTrue(loggedRows.contains("row0004"));
assertTrue(loggedRows.contains("row0005"));
// flush all regions
List<HRegion> regions =
new ArrayList<HRegion>(server.getOnlineRegionsLocalContext());
for (HRegion r: regions) {
r.flushcache();
}
ResultScanner scanner = table.getScanner(new Scan());
try {
for (int i=2; i<=5; i++) {
Result r = scanner.next();
assertNotNull(r);
assertFalse(r.isEmpty());
assertEquals("row000"+i, Bytes.toString(r.getRow()));
}
} finally {
scanner.close();
}
}
}