diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java index 5ac421c318e..c8f4946273d 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.Comparator; import java.util.HashSet; import java.util.Iterator; import java.util.LinkedList; @@ -297,7 +298,13 @@ public class WALProcedureStore extends ProcedureStoreBase { FileStatus[] oldLogs = getLogFiles(); while (isRunning()) { // Get Log-MaxID and recover lease on old logs - flushLogId = initOldLogs(oldLogs); + try { + flushLogId = initOldLogs(oldLogs); + } catch (FileNotFoundException e) { + LOG.warn("someone else is active and deleted logs. retrying.", e); + oldLogs = getLogFiles(); + continue; + } // Create new state-log if (!rollWriter(flushLogId + 1)) { @@ -928,15 +935,29 @@ public class WALProcedureStore extends ProcedureStoreBase { return Long.parseLong(name.substring(start, end)); } + private static final PathFilter WALS_PATH_FILTER = new PathFilter() { + @Override + public boolean accept(Path path) { + String name = path.getName(); + return name.startsWith("state-") && name.endsWith(".log"); + } + }; + + private static final Comparator FILE_STATUS_ID_COMPARATOR = + new Comparator() { + @Override + public int compare(FileStatus a, FileStatus b) { + final long aId = getLogIdFromName(a.getPath().getName()); + final long bId = getLogIdFromName(b.getPath().getName()); + return Long.compare(aId, bId); + } + }; + private FileStatus[] getLogFiles() throws IOException { try { - return fs.listStatus(logDir, new PathFilter() { - @Override - public boolean accept(Path path) { - String name = path.getName(); - return name.startsWith("state-") && name.endsWith(".log"); - } - }); + FileStatus[] files = fs.listStatus(logDir, WALS_PATH_FILTER); + Arrays.sort(files, FILE_STATUS_ID_COMPARATOR); + return files; } catch (FileNotFoundException e) { LOG.warn("Log directory not found: " + e.getMessage()); return null; diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java index f964d869db9..8a244ddc78c 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java @@ -468,6 +468,48 @@ public class TestWALProcedureStore { assertEquals(1, procStore.getActiveLogs().size()); } + @Test + public void testFileNotFoundDuringLeaseRecovery() throws IOException { + TestProcedure[] procs = new TestProcedure[3]; + for (int i = 0; i < procs.length; ++i) { + procs[i] = new TestProcedure(i + 1, 0); + procStore.insert(procs[i], null); + } + procStore.rollWriterForTesting(); + for (int i = 0; i < procs.length; ++i) { + procStore.update(procs[i]); + procStore.rollWriterForTesting(); + } + procStore.stop(false); + + FileStatus[] status = fs.listStatus(logDir); + assertEquals(procs.length + 2, status.length); + + // simulate another active master removing the wals + procStore = new WALProcedureStore(htu.getConfiguration(), fs, logDir, + new WALProcedureStore.LeaseRecovery() { + private int count = 0; + + @Override + public void recoverFileLease(FileSystem fs, Path path) throws IOException { + if (++count <= 2) { + fs.delete(path, false); + LOG.debug("Simulate FileNotFound at count=" + count + " for " + path); + throw new FileNotFoundException("test file not found " + path); + } + LOG.debug("Simulate recoverFileLease() at count=" + count + " for " + path); + } + }); + + final LoadCounter loader = new LoadCounter(); + procStore.start(PROCEDURE_STORE_SLOTS); + procStore.recoverLease(); + procStore.load(loader); + assertEquals(procs.length, loader.getMaxProcId()); + assertEquals(procs.length - 1, loader.getLoadedCount()); + assertEquals(0, loader.getCorruptedCount()); + } + private void corruptLog(final FileStatus logFile, final long dropBytes) throws IOException { assertTrue(logFile.getLen() > dropBytes);