diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogFactory.java index eaee4346033..f96f7e0532a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogFactory.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogFactory.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.regionserver.wal; import java.io.IOException; +import java.io.InterruptedIOException; import java.util.List; import org.apache.commons.logging.Log; @@ -31,6 +32,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader; import org.apache.hadoop.hbase.regionserver.wal.HLog.Writer; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; public class HLogFactory { private static final Log LOG = LogFactory.getLog(HLogFactory.class); @@ -67,7 +69,7 @@ public class HLogFactory { static void resetLogReaderClass() { logReaderClass = null; } - + /** * Create a reader for the WAL. If you are reading from a file that's being written to * and need to reopen it multiple times, use {@link HLog.Reader#reset()} instead of this method @@ -76,28 +78,55 @@ public class HLogFactory { * @throws IOException */ public static HLog.Reader createReader(final FileSystem fs, - final Path path, Configuration conf) - throws IOException { - try { - - if (logReaderClass == null) { - - logReaderClass = conf.getClass("hbase.regionserver.hlog.reader.impl", - SequenceFileLogReader.class, Reader.class); - } - - - HLog.Reader reader = logReaderClass.newInstance(); - reader.init(fs, path, conf); - return reader; - } catch (IOException e) { - throw e; + final Path path, Configuration conf) throws IOException { + if (logReaderClass == null) { + logReaderClass = conf.getClass("hbase.regionserver.hlog.reader.impl", + SequenceFileLogReader.class, Reader.class); } - catch (Exception e) { + + try { + // A hlog file could be under recovery, so it may take several + // tries to get it open. Instead of claiming it is corrupted, retry + // to open it up to 5 minutes by default. + long startWaiting = EnvironmentEdgeManager.currentTimeMillis(); + long openTimeout = conf.getInt("hbase.hlog.open.timeout", 300000) + startWaiting; + int nbAttempt = 0; + while (true) { + try { + HLog.Reader reader = logReaderClass.newInstance(); + reader.init(fs, path, conf); + return reader; + } catch (IOException e) { + String msg = e.getMessage(); + if (msg != null && msg.contains("Cannot obtain block length")) { + if (++nbAttempt == 1) { + LOG.warn("Lease should have recovered. This is not expected. Will retry", e); + } + if (nbAttempt > 2 && openTimeout < EnvironmentEdgeManager.currentTimeMillis()) { + LOG.error("Can't open after " + nbAttempt + " attempts and " + + (EnvironmentEdgeManager.currentTimeMillis() - startWaiting) + + "ms " + " for " + path); + } else { + try { + Thread.sleep(nbAttempt < 3 ? 500 : 1000); + continue; // retry + } catch (InterruptedException ie) { + InterruptedIOException iioe = new InterruptedIOException(); + iioe.initCause(ie); + throw iioe; + } + } + } + throw e; + } + } + } catch (IOException ie) { + throw ie; + } catch (Exception e) { throw new IOException("Cannot get log reader", e); } } - + /* * WAL writer */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java index ff41f2fb81b..f0ba96ed3e1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java @@ -26,7 +26,6 @@ import java.lang.reflect.InvocationTargetException; import java.text.ParseException; import java.util.ArrayList; import java.util.Collections; -import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java index 320a6f83dea..21b3023e254 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java @@ -880,6 +880,44 @@ public class TestHLogSplit { } } + @Test + public void testRetryOpenDuringRecovery() throws Exception { + generateHLogs(-1); + + fs.initialize(fs.getUri(), conf); + + FileSystem spiedFs = Mockito.spy(fs); + // The "Cannot obtain block length" part is very important, + // that's how it comes out of HDFS. If HDFS changes the exception + // message, this test needs to be adjusted accordingly. + // + // When DFSClient tries to open a file, HDFS needs to locate + // the last block of the file and get its length. However, if the + // last block is under recovery, HDFS may have problem to obtain + // the block length, in which case, retry may help. + Mockito.doAnswer(new Answer() { + private int count = 0; + + public FSDataInputStream answer(InvocationOnMock invocation) throws Throwable { + if (count++ < 3) { + throw new IOException("Cannot obtain block length"); + } + return (FSDataInputStream)invocation.callRealMethod(); + } + }).when(spiedFs).open(Mockito.any(), Mockito.anyInt()); + + HLogSplitter logSplitter = new HLogSplitter( + conf, HBASEDIR, HLOGDIR, OLDLOGDIR, spiedFs, null); + + try { + logSplitter.splitLog(); + assertEquals(NUM_WRITERS, fs.listStatus(OLDLOGDIR).length); + assertFalse(fs.exists(HLOGDIR)); + } catch (IOException e) { + fail("There shouldn't be any exception but: " + e.toString()); + } + } + /** * Test log split process with fake data and lots of edits to trigger threading * issues. @@ -1330,6 +1368,7 @@ public class TestHLogSplit { private Path getLogForRegion(Path rootdir, byte[] table, String region) throws IOException { Path tdir = HTableDescriptor.getTableDir(rootdir, table); + @SuppressWarnings("deprecation") Path editsdir = HLogUtil.getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir, Bytes.toString(region.getBytes()))); FileStatus [] files = this.fs.listStatus(editsdir);