HBASE-2312 [jira] Possible data loss when RS goes into GC pause while rolling
HLog Summary: There is a very corner case when bad things could happen(ie data loss): 1) RS #1 is going to roll its HLog - not yet created the new one, old one will get no more writes 2) RS #1 enters GC Pause of Death 3) Master lists HLog files of RS#1 that is has to split as RS#1 is dead, starts splitting 4) RS #1 wakes up, created the new HLog (previous one was rolled) and appends an edit - which is lost The following seems like a possible solution: 1) Master detects RS#1 is dead 2) The master renames the /hbase/.logs/<regionserver name> directory to something else (say /hbase/.logs/<regionserver name>-dead) 3) Add mkdir support (as opposed to mkdirs) to HDFS - so that a file create fails if the directory doesn't exist. Dhruba tells me this is very doable. 4) RS#1 comes back up and is not able create the new hlog. It restarts itself. Test Plan: EMPTY Reviewers: JIRA, stack, khemani Reviewed By: khemani CC: tedyu, nspiegelberg, stack, Kannan, khemani, jgray Differential Revision: 99 git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1196773 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
119ae0c4ac
commit
b44e09085c
|
@ -183,55 +183,87 @@ public class MasterFileSystem {
|
|||
* {@link ServerName}
|
||||
*/
|
||||
void splitLogAfterStartup(final Set<ServerName> onlineServers) {
|
||||
boolean retrySplitting = !conf.getBoolean("hbase.hlog.split.skip.errors",
|
||||
HLog.SPLIT_SKIP_ERRORS_DEFAULT);
|
||||
Path logsDirPath = new Path(this.rootdir, HConstants.HREGION_LOGDIR_NAME);
|
||||
try {
|
||||
if (!this.fs.exists(logsDirPath)) {
|
||||
return;
|
||||
do {
|
||||
List<ServerName> serverNames = new ArrayList<ServerName>();
|
||||
try {
|
||||
if (!this.fs.exists(logsDirPath)) return;
|
||||
FileStatus[] logFolders = this.fs.listStatus(logsDirPath);
|
||||
|
||||
if (logFolders == null || logFolders.length == 0) {
|
||||
LOG.debug("No log files to split, proceeding...");
|
||||
return;
|
||||
}
|
||||
for (FileStatus status : logFolders) {
|
||||
String sn = status.getPath().getName();
|
||||
// truncate splitting suffix if present (for ServerName parsing)
|
||||
if (sn.endsWith(HLog.SPLITTING_EXT)) {
|
||||
sn = sn.substring(0, sn.length() - HLog.SPLITTING_EXT.length());
|
||||
}
|
||||
ServerName serverName = ServerName.parseServerName(sn);
|
||||
if (!onlineServers.contains(serverName)) {
|
||||
LOG.info("Log folder " + status.getPath() + " doesn't belong "
|
||||
+ "to a known region server, splitting");
|
||||
serverNames.add(serverName);
|
||||
} else {
|
||||
LOG.info("Log folder " + status.getPath()
|
||||
+ " belongs to an existing region server");
|
||||
}
|
||||
}
|
||||
splitLog(serverNames);
|
||||
retrySplitting = false;
|
||||
} catch (IOException ioe) {
|
||||
LOG.warn("Failed splitting of " + serverNames, ioe);
|
||||
if (!checkFileSystem()) {
|
||||
LOG.warn("Bad Filesystem, exiting");
|
||||
Runtime.getRuntime().halt(1);
|
||||
}
|
||||
try {
|
||||
if (retrySplitting) {
|
||||
Thread.sleep(conf.getInt(
|
||||
"hbase.hlog.split.failure.retry.interval", 30 * 1000));
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
LOG.warn("Interrupted, returning w/o splitting at startup");
|
||||
Thread.currentThread().interrupt();
|
||||
retrySplitting = false;
|
||||
}
|
||||
}
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException("Failed exists test on " + logsDirPath, e);
|
||||
}
|
||||
FileStatus[] logFolders;
|
||||
try {
|
||||
logFolders = this.fs.listStatus(logsDirPath);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException("Failed listing " + logsDirPath.toString(), e);
|
||||
}
|
||||
if (logFolders == null || logFolders.length == 0) {
|
||||
LOG.debug("No log files to split, proceeding...");
|
||||
return;
|
||||
}
|
||||
List<ServerName> serverNames = new ArrayList<ServerName>();
|
||||
for (FileStatus status : logFolders) {
|
||||
String sn = status.getPath().getName();
|
||||
// Is this old or new style servername? If old style, it will be
|
||||
// hostname, colon, and port. If new style, it will be formatted as
|
||||
// ServerName.toString.
|
||||
ServerName serverName = ServerName.parseServerName(sn);
|
||||
if (!onlineServers.contains(serverName)) {
|
||||
LOG.info("Log folder " + status.getPath() + " doesn't belong " +
|
||||
"to a known region server, splitting");
|
||||
serverNames.add(serverName);
|
||||
} else {
|
||||
LOG.info("Log folder " + status.getPath() +
|
||||
" belongs to an existing region server");
|
||||
}
|
||||
}
|
||||
splitLog(serverNames);
|
||||
} while (retrySplitting);
|
||||
}
|
||||
|
||||
public void splitLog(final ServerName serverName){
|
||||
public void splitLog(final ServerName serverName) throws IOException {
|
||||
List<ServerName> serverNames = new ArrayList<ServerName>();
|
||||
serverNames.add(serverName);
|
||||
splitLog(serverNames);
|
||||
}
|
||||
|
||||
public void splitLog(final List<ServerName> serverNames) {
|
||||
public void splitLog(final List<ServerName> serverNames) throws IOException {
|
||||
long splitTime = 0, splitLogSize = 0;
|
||||
List<Path> logDirs = new ArrayList<Path>();
|
||||
for(ServerName serverName: serverNames){
|
||||
Path logDir = new Path(this.rootdir, HLog.getHLogDirectoryName(serverName.toString()));
|
||||
logDirs.add(logDir);
|
||||
Path logDir = new Path(this.rootdir,
|
||||
HLog.getHLogDirectoryName(serverName.toString()));
|
||||
Path splitDir = logDir.suffix(HLog.SPLITTING_EXT);
|
||||
// rename the directory so a rogue RS doesn't create more HLogs
|
||||
if (fs.exists(logDir)) {
|
||||
if (!this.fs.rename(logDir, splitDir)) {
|
||||
throw new IOException("Failed fs.rename for log split: " + logDir);
|
||||
}
|
||||
logDir = splitDir;
|
||||
LOG.debug("Renamed region directory: " + splitDir);
|
||||
} else if (!fs.exists(splitDir)) {
|
||||
LOG.info("Log dir for server " + serverName + " does not exist");
|
||||
continue;
|
||||
}
|
||||
logDirs.add(splitDir);
|
||||
}
|
||||
|
||||
if (logDirs.isEmpty()) {
|
||||
LOG.info("No logs to split");
|
||||
return;
|
||||
}
|
||||
|
||||
if (distributedLogSplitting) {
|
||||
|
@ -240,15 +272,10 @@ public class MasterFileSystem {
|
|||
}
|
||||
splitTime = EnvironmentEdgeManager.currentTimeMillis();
|
||||
try {
|
||||
try {
|
||||
splitLogSize = splitLogManager.splitLogDistributed(logDirs);
|
||||
} catch (OrphanHLogAfterSplitException e) {
|
||||
LOG.warn("Retrying distributed splitting for " +
|
||||
serverNames + "because of:", e);
|
||||
splitLogManager.splitLogDistributed(logDirs);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOG.error("Failed distributed splitting " + serverNames, e);
|
||||
splitLogSize = splitLogManager.splitLogDistributed(logDirs);
|
||||
} catch (OrphanHLogAfterSplitException e) {
|
||||
LOG.warn("Retrying distributed splitting for " + serverNames, e);
|
||||
splitLogManager.splitLogDistributed(logDirs);
|
||||
}
|
||||
splitTime = EnvironmentEdgeManager.currentTimeMillis() - splitTime;
|
||||
} else {
|
||||
|
@ -272,8 +299,6 @@ public class MasterFileSystem {
|
|||
}
|
||||
splitTime = splitter.getTime();
|
||||
splitLogSize = splitter.getSize();
|
||||
} catch (IOException e) {
|
||||
LOG.error("Failed splitting " + logDir.toString(), e);
|
||||
} finally {
|
||||
this.splitLogLock.unlock();
|
||||
}
|
||||
|
|
|
@ -171,9 +171,14 @@ public class ServerShutdownHandler extends EventHandler {
|
|||
|
||||
try {
|
||||
|
||||
if ( this.shouldSplitHlog ) {
|
||||
try {
|
||||
LOG.info("Splitting logs for " + serverName);
|
||||
this.services.getMasterFileSystem().splitLog(serverName);
|
||||
} catch (IOException ioe) {
|
||||
this.services.getExecutorService().submit(this);
|
||||
this.deadServers.add(serverName);
|
||||
throw new IOException("failed log splitting for " +
|
||||
serverName + ", will retry", ioe);
|
||||
}
|
||||
|
||||
// Assign root and meta if we were carrying them.
|
||||
|
|
|
@ -115,6 +115,10 @@ public class HLog implements Syncable {
|
|||
public static final byte [] METAFAMILY = Bytes.toBytes("METAFAMILY");
|
||||
static final byte [] METAROW = Bytes.toBytes("METAROW");
|
||||
|
||||
/** File Extension used while splitting an HLog into regions (HBASE-2312) */
|
||||
public static final String SPLITTING_EXT = "-splitting";
|
||||
public static final boolean SPLIT_SKIP_ERRORS_DEFAULT = false;
|
||||
|
||||
/*
|
||||
* Name of directory that holds recovered edits written by the wal log
|
||||
* splitting code, one per region
|
||||
|
@ -793,8 +797,7 @@ public class HLog implements Syncable {
|
|||
* @return Path to current writer or null if none.
|
||||
* @throws IOException
|
||||
*/
|
||||
private Path cleanupCurrentWriter(final long currentfilenum)
|
||||
throws IOException {
|
||||
Path cleanupCurrentWriter(final long currentfilenum) throws IOException {
|
||||
Path oldFile = null;
|
||||
if (this.writer != null) {
|
||||
// Close the current writer, get a new one.
|
||||
|
@ -809,6 +812,7 @@ public class HLog implements Syncable {
|
|||
sync();
|
||||
}
|
||||
this.writer.close();
|
||||
this.writer = null;
|
||||
closeErrorCount.set(0);
|
||||
} catch (IOException e) {
|
||||
LOG.error("Failed close of HLog writer", e);
|
||||
|
@ -944,7 +948,9 @@ public class HLog implements Syncable {
|
|||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("closing hlog writer in " + this.dir.toString());
|
||||
}
|
||||
this.writer.close();
|
||||
if (this.writer != null) {
|
||||
this.writer.close();
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
cacheFlushLock.unlock();
|
||||
|
|
|
@ -367,7 +367,8 @@ public class HLogSplitter {
|
|||
|
||||
boolean progress_failed = false;
|
||||
|
||||
boolean skipErrors = conf.getBoolean("hbase.hlog.split.skip.errors", true);
|
||||
boolean skipErrors = conf.getBoolean("hbase.hlog.split.skip.errors",
|
||||
HLog.SPLIT_SKIP_ERRORS_DEFAULT);
|
||||
int interval = conf.getInt("hbase.splitlog.report.interval.loglines", 1024);
|
||||
// How often to send a progress report (default 1/2 master timeout)
|
||||
int period = conf.getInt("hbase.splitlog.report.period",
|
||||
|
|
|
@ -23,6 +23,7 @@ package org.apache.hadoop.hbase.regionserver.wal;
|
|||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.lang.reflect.Field;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.lang.reflect.Method;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
|
@ -32,7 +33,9 @@ import org.apache.hadoop.fs.FSDataOutputStream;
|
|||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.io.SequenceFile;
|
||||
import org.apache.hadoop.io.SequenceFile.CompressionType;
|
||||
import org.apache.hadoop.io.SequenceFile.Metadata;
|
||||
import org.apache.hadoop.io.compress.CompressionCodec;
|
||||
import org.apache.hadoop.io.compress.DefaultCodec;
|
||||
|
||||
/**
|
||||
|
@ -78,17 +81,50 @@ public class SequenceFileLogWriter implements HLog.Writer {
|
|||
}
|
||||
|
||||
// Create a SF.Writer instance.
|
||||
this.writer = SequenceFile.createWriter(fs, conf, path,
|
||||
keyClass, WALEdit.class,
|
||||
fs.getConf().getInt("io.file.buffer.size", 4096),
|
||||
(short) conf.getInt("hbase.regionserver.hlog.replication",
|
||||
fs.getDefaultReplication()),
|
||||
conf.getLong("hbase.regionserver.hlog.blocksize",
|
||||
fs.getDefaultBlockSize()),
|
||||
SequenceFile.CompressionType.NONE,
|
||||
new DefaultCodec(),
|
||||
null,
|
||||
new Metadata());
|
||||
try {
|
||||
// reflection for a version of SequenceFile.createWriter that doesn't
|
||||
// automatically create the parent directory (see HBASE-2312)
|
||||
this.writer = (SequenceFile.Writer) SequenceFile.class
|
||||
.getMethod("createWriter", new Class[] {FileSystem.class,
|
||||
Configuration.class, Path.class, Class.class, Class.class,
|
||||
Integer.TYPE, Short.TYPE, Long.TYPE, Boolean.TYPE,
|
||||
CompressionType.class, CompressionCodec.class, Metadata.class})
|
||||
.invoke(null, new Object[] {fs, conf, path, HLog.getKeyClass(conf),
|
||||
WALEdit.class,
|
||||
new Integer(fs.getConf().getInt("io.file.buffer.size", 4096)),
|
||||
new Short((short)
|
||||
conf.getInt("hbase.regionserver.hlog.replication",
|
||||
fs.getDefaultReplication())),
|
||||
new Long(conf.getLong("hbase.regionserver.hlog.blocksize",
|
||||
fs.getDefaultBlockSize())),
|
||||
new Boolean(false) /*createParent*/,
|
||||
SequenceFile.CompressionType.NONE, new DefaultCodec(),
|
||||
new Metadata()
|
||||
});
|
||||
} catch (InvocationTargetException ite) {
|
||||
// function was properly called, but threw it's own exception
|
||||
throw new IOException(ite.getCause());
|
||||
} catch (Exception e) {
|
||||
// ignore all other exceptions. related to reflection failure
|
||||
}
|
||||
|
||||
// if reflection failed, use the old createWriter
|
||||
if (this.writer == null) {
|
||||
LOG.debug("new createWriter -- HADOOP-6840 -- not available");
|
||||
this.writer = SequenceFile.createWriter(fs, conf, path,
|
||||
HLog.getKeyClass(conf), WALEdit.class,
|
||||
fs.getConf().getInt("io.file.buffer.size", 4096),
|
||||
(short) conf.getInt("hbase.regionserver.hlog.replication",
|
||||
fs.getDefaultReplication()),
|
||||
conf.getLong("hbase.regionserver.hlog.blocksize",
|
||||
fs.getDefaultBlockSize()),
|
||||
SequenceFile.CompressionType.NONE,
|
||||
new DefaultCodec(),
|
||||
null,
|
||||
new Metadata());
|
||||
} else {
|
||||
LOG.debug("using new createWriter -- HADOOP-6840");
|
||||
}
|
||||
|
||||
this.writer_out = getSequenceFilePrivateFSDataOutputStreamAccessible();
|
||||
this.syncFs = getSyncFs();
|
||||
|
|
|
@ -482,15 +482,20 @@ public class ReplicationSource extends Thread
|
|||
|
||||
Path deadRsDirectory =
|
||||
new Path(manager.getLogDir().getParent(), this.deadRegionServers[i]);
|
||||
Path possibleLogLocation =
|
||||
new Path(deadRsDirectory, currentPath.getName());
|
||||
LOG.info("Possible location " + possibleLogLocation.toUri().toString());
|
||||
if (this.manager.getFs().exists(possibleLogLocation)) {
|
||||
// We found the right new location
|
||||
LOG.info("Log " + this.currentPath + " still exists at " +
|
||||
possibleLogLocation);
|
||||
// Breaking here will make us sleep since reader is null
|
||||
return true;
|
||||
Path[] locs = new Path[] {
|
||||
new Path(deadRsDirectory, currentPath.getName()),
|
||||
new Path(deadRsDirectory.suffix(HLog.SPLITTING_EXT),
|
||||
currentPath.getName()),
|
||||
};
|
||||
for (Path possibleLogLocation : locs) {
|
||||
LOG.info("Possible location " + possibleLogLocation.toUri().toString());
|
||||
if (this.manager.getFs().exists(possibleLogLocation)) {
|
||||
// We found the right new location
|
||||
LOG.info("Log " + this.currentPath + " still exists at " +
|
||||
possibleLogLocation);
|
||||
// Breaking here will make us sleep since reader is null
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
// TODO What happens if the log was missing from every single location?
|
||||
|
|
|
@ -42,6 +42,7 @@ import org.apache.hadoop.fs.Path;
|
|||
import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
|
@ -57,6 +58,7 @@ import org.junit.AfterClass;
|
|||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
|
@ -136,6 +138,8 @@ public class TestHLogSplit {
|
|||
assertTrue("Deleting " + dir.getPath(),
|
||||
fs.delete(dir.getPath(), true));
|
||||
}
|
||||
// create the HLog directory because recursive log creates are not allowed
|
||||
fs.mkdirs(hlogDir);
|
||||
seq = 0;
|
||||
regions = new ArrayList<String>();
|
||||
Collections.addAll(regions, "bbb", "ccc");
|
||||
|
@ -820,7 +824,72 @@ public class TestHLogSplit {
|
|||
assertEquals(regions.size(), outputCounts.size());
|
||||
}
|
||||
|
||||
// HBASE-2312: tests the case where a RegionServer enters a GC pause,
|
||||
// comes back online after the master declared it dead and started to split.
|
||||
// Want log rolling after a master split to fail
|
||||
@Test
|
||||
@Ignore("Need HADOOP-6886, HADOOP-6840, & HDFS-617 for this. HDFS 0.20.205.1+ should have this")
|
||||
public void testLogRollAfterSplitStart() throws IOException {
|
||||
// set flush interval to a large number so it doesn't interrupt us
|
||||
final String F_INTERVAL = "hbase.regionserver.optionallogflushinterval";
|
||||
long oldFlushInterval = conf.getLong(F_INTERVAL, 1000);
|
||||
conf.setLong(F_INTERVAL, 1000*1000*100);
|
||||
HLog log = null;
|
||||
Path thisTestsDir = new Path(hbaseDir, "testLogRollAfterSplitStart");
|
||||
|
||||
try {
|
||||
// put some entries in an HLog
|
||||
byte [] tableName = Bytes.toBytes(this.getClass().getName());
|
||||
HRegionInfo regioninfo = new HRegionInfo(tableName,
|
||||
HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
|
||||
log = new HLog(fs, thisTestsDir, oldLogDir, conf);
|
||||
final int total = 20;
|
||||
for (int i = 0; i < total; i++) {
|
||||
WALEdit kvs = new WALEdit();
|
||||
kvs.add(new KeyValue(Bytes.toBytes(i), tableName, tableName));
|
||||
HTableDescriptor htd = new HTableDescriptor(tableName);
|
||||
htd.addFamily(new HColumnDescriptor("column"));
|
||||
log.append(regioninfo, tableName, kvs, System.currentTimeMillis(), htd);
|
||||
}
|
||||
// Send the data to HDFS datanodes and close the HDFS writer
|
||||
log.sync();
|
||||
log.cleanupCurrentWriter(log.getFilenum());
|
||||
|
||||
/* code taken from ProcessServerShutdown.process()
|
||||
* handles RS shutdowns (as observed by the Master)
|
||||
*/
|
||||
// rename the directory so a rogue RS doesn't create more HLogs
|
||||
Path rsSplitDir = new Path(thisTestsDir.getParent(),
|
||||
thisTestsDir.getName() + "-splitting");
|
||||
fs.rename(thisTestsDir, rsSplitDir);
|
||||
LOG.debug("Renamed region directory: " + rsSplitDir);
|
||||
|
||||
// Process the old log files
|
||||
HLogSplitter splitter = HLogSplitter.createLogSplitter(conf,
|
||||
hbaseDir, rsSplitDir, oldLogDir, fs);
|
||||
splitter.splitLog();
|
||||
|
||||
// Now, try to roll the HLog and verify failure
|
||||
try {
|
||||
log.rollWriter();
|
||||
Assert.fail("rollWriter() did not throw any exception.");
|
||||
} catch (IOException ioe) {
|
||||
if (ioe.getCause().getMessage().contains("FileNotFound")) {
|
||||
LOG.info("Got the expected exception: ", ioe.getCause());
|
||||
} else {
|
||||
Assert.fail("Unexpected exception: " + ioe);
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
conf.setLong(F_INTERVAL, oldFlushInterval);
|
||||
if (log != null) {
|
||||
log.close();
|
||||
}
|
||||
if (fs.exists(thisTestsDir)) {
|
||||
fs.delete(thisTestsDir, true);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This thread will keep writing to the file after the split process has started
|
||||
|
@ -1056,6 +1125,7 @@ public class TestHLogSplit {
|
|||
|
||||
private void generateHLogs(int writers, int entries, int leaveOpen) throws IOException {
|
||||
makeRegionDirs(fs, regions);
|
||||
fs.mkdirs(hlogDir);
|
||||
for (int i = 0; i < writers; i++) {
|
||||
writer[i] = HLog.createWriter(fs, new Path(hlogDir, HLOG_FILE_PREFIX + i), conf);
|
||||
for (int j = 0; j < entries; j++) {
|
||||
|
|
Loading…
Reference in New Issue