HBASE-8314 HLogSplitter can retry to open a 0-length hlog file

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1467790 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
jxiang 2013-04-14 15:23:44 +00:00
parent 6c1e484d36
commit 10a3358bed
3 changed files with 87 additions and 20 deletions

View File

@ -21,6 +21,7 @@
package org.apache.hadoop.hbase.regionserver.wal;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.List;
import org.apache.commons.logging.Log;
@ -31,6 +32,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader;
import org.apache.hadoop.hbase.regionserver.wal.HLog.Writer;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
public class HLogFactory {
private static final Log LOG = LogFactory.getLog(HLogFactory.class);
@ -67,7 +69,7 @@ public class HLogFactory {
static void resetLogReaderClass() {
logReaderClass = null;
}
/**
* Create a reader for the WAL. If you are reading from a file that's being written to
* and need to reopen it multiple times, use {@link HLog.Reader#reset()} instead of this method
@ -76,28 +78,55 @@ public class HLogFactory {
* @throws IOException
*/
public static HLog.Reader createReader(final FileSystem fs,
final Path path, Configuration conf)
throws IOException {
try {
if (logReaderClass == null) {
logReaderClass = conf.getClass("hbase.regionserver.hlog.reader.impl",
SequenceFileLogReader.class, Reader.class);
}
HLog.Reader reader = logReaderClass.newInstance();
reader.init(fs, path, conf);
return reader;
} catch (IOException e) {
throw e;
final Path path, Configuration conf) throws IOException {
if (logReaderClass == null) {
logReaderClass = conf.getClass("hbase.regionserver.hlog.reader.impl",
SequenceFileLogReader.class, Reader.class);
}
catch (Exception e) {
try {
// A hlog file could be under recovery, so it may take several
// tries to get it open. Instead of claiming it is corrupted, retry
// to open it up to 5 minutes by default.
long startWaiting = EnvironmentEdgeManager.currentTimeMillis();
long openTimeout = conf.getInt("hbase.hlog.open.timeout", 300000) + startWaiting;
int nbAttempt = 0;
while (true) {
try {
HLog.Reader reader = logReaderClass.newInstance();
reader.init(fs, path, conf);
return reader;
} catch (IOException e) {
String msg = e.getMessage();
if (msg != null && msg.contains("Cannot obtain block length")) {
if (++nbAttempt == 1) {
LOG.warn("Lease should have recovered. This is not expected. Will retry", e);
}
if (nbAttempt > 2 && openTimeout < EnvironmentEdgeManager.currentTimeMillis()) {
LOG.error("Can't open after " + nbAttempt + " attempts and "
+ (EnvironmentEdgeManager.currentTimeMillis() - startWaiting)
+ "ms " + " for " + path);
} else {
try {
Thread.sleep(nbAttempt < 3 ? 500 : 1000);
continue; // retry
} catch (InterruptedException ie) {
InterruptedIOException iioe = new InterruptedIOException();
iioe.initCause(ie);
throw iioe;
}
}
}
throw e;
}
}
} catch (IOException ie) {
throw ie;
} catch (Exception e) {
throw new IOException("Cannot get log reader", e);
}
}
/*
* WAL writer
*/

View File

@ -26,7 +26,6 @@ import java.lang.reflect.InvocationTargetException;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;

View File

@ -880,6 +880,44 @@ public class TestHLogSplit {
}
}
@Test
public void testRetryOpenDuringRecovery() throws Exception {
generateHLogs(-1);
fs.initialize(fs.getUri(), conf);
FileSystem spiedFs = Mockito.spy(fs);
// The "Cannot obtain block length" part is very important,
// that's how it comes out of HDFS. If HDFS changes the exception
// message, this test needs to be adjusted accordingly.
//
// When DFSClient tries to open a file, HDFS needs to locate
// the last block of the file and get its length. However, if the
// last block is under recovery, HDFS may have problem to obtain
// the block length, in which case, retry may help.
Mockito.doAnswer(new Answer<FSDataInputStream>() {
private int count = 0;
public FSDataInputStream answer(InvocationOnMock invocation) throws Throwable {
if (count++ < 3) {
throw new IOException("Cannot obtain block length");
}
return (FSDataInputStream)invocation.callRealMethod();
}
}).when(spiedFs).open(Mockito.<Path>any(), Mockito.anyInt());
HLogSplitter logSplitter = new HLogSplitter(
conf, HBASEDIR, HLOGDIR, OLDLOGDIR, spiedFs, null);
try {
logSplitter.splitLog();
assertEquals(NUM_WRITERS, fs.listStatus(OLDLOGDIR).length);
assertFalse(fs.exists(HLOGDIR));
} catch (IOException e) {
fail("There shouldn't be any exception but: " + e.toString());
}
}
/**
* Test log split process with fake data and lots of edits to trigger threading
* issues.
@ -1330,6 +1368,7 @@ public class TestHLogSplit {
private Path getLogForRegion(Path rootdir, byte[] table, String region)
throws IOException {
Path tdir = HTableDescriptor.getTableDir(rootdir, table);
@SuppressWarnings("deprecation")
Path editsdir = HLogUtil.getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir,
Bytes.toString(region.getBytes())));
FileStatus [] files = this.fs.listStatus(editsdir);