HBASE-16056 Procedure v2 - fix master crash for FileNotFound

This commit is contained in:
Matteo Bertozzi 2016-06-17 12:43:21 -07:00
parent 61ff6ced5b
commit 568e37d383
2 changed files with 71 additions and 8 deletions

View File

@ -23,6 +23,7 @@ import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedList; import java.util.LinkedList;
@ -297,7 +298,13 @@ public class WALProcedureStore extends ProcedureStoreBase {
FileStatus[] oldLogs = getLogFiles(); FileStatus[] oldLogs = getLogFiles();
while (isRunning()) { while (isRunning()) {
// Get Log-MaxID and recover lease on old logs // Get Log-MaxID and recover lease on old logs
try {
flushLogId = initOldLogs(oldLogs); flushLogId = initOldLogs(oldLogs);
} catch (FileNotFoundException e) {
LOG.warn("someone else is active and deleted logs. retrying.", e);
oldLogs = getLogFiles();
continue;
}
// Create new state-log // Create new state-log
if (!rollWriter(flushLogId + 1)) { if (!rollWriter(flushLogId + 1)) {
@ -928,15 +935,29 @@ public class WALProcedureStore extends ProcedureStoreBase {
return Long.parseLong(name.substring(start, end)); return Long.parseLong(name.substring(start, end));
} }
private FileStatus[] getLogFiles() throws IOException { private static final PathFilter WALS_PATH_FILTER = new PathFilter() {
try {
return fs.listStatus(logDir, new PathFilter() {
@Override @Override
public boolean accept(Path path) { public boolean accept(Path path) {
String name = path.getName(); String name = path.getName();
return name.startsWith("state-") && name.endsWith(".log"); return name.startsWith("state-") && name.endsWith(".log");
} }
}); };
private static final Comparator<FileStatus> FILE_STATUS_ID_COMPARATOR =
new Comparator<FileStatus>() {
@Override
public int compare(FileStatus a, FileStatus b) {
final long aId = getLogIdFromName(a.getPath().getName());
final long bId = getLogIdFromName(b.getPath().getName());
return Long.compare(aId, bId);
}
};
private FileStatus[] getLogFiles() throws IOException {
try {
FileStatus[] files = fs.listStatus(logDir, WALS_PATH_FILTER);
Arrays.sort(files, FILE_STATUS_ID_COMPARATOR);
return files;
} catch (FileNotFoundException e) { } catch (FileNotFoundException e) {
LOG.warn("Log directory not found: " + e.getMessage()); LOG.warn("Log directory not found: " + e.getMessage());
return null; return null;

View File

@ -468,6 +468,48 @@ public class TestWALProcedureStore {
assertEquals(1, procStore.getActiveLogs().size()); assertEquals(1, procStore.getActiveLogs().size());
} }
@Test
public void testFileNotFoundDuringLeaseRecovery() throws IOException {
TestProcedure[] procs = new TestProcedure[3];
for (int i = 0; i < procs.length; ++i) {
procs[i] = new TestProcedure(i + 1, 0);
procStore.insert(procs[i], null);
}
procStore.rollWriterForTesting();
for (int i = 0; i < procs.length; ++i) {
procStore.update(procs[i]);
procStore.rollWriterForTesting();
}
procStore.stop(false);
FileStatus[] status = fs.listStatus(logDir);
assertEquals(procs.length + 2, status.length);
// simulate another active master removing the wals
procStore = new WALProcedureStore(htu.getConfiguration(), fs, logDir,
new WALProcedureStore.LeaseRecovery() {
private int count = 0;
@Override
public void recoverFileLease(FileSystem fs, Path path) throws IOException {
if (++count <= 2) {
fs.delete(path, false);
LOG.debug("Simulate FileNotFound at count=" + count + " for " + path);
throw new FileNotFoundException("test file not found " + path);
}
LOG.debug("Simulate recoverFileLease() at count=" + count + " for " + path);
}
});
final LoadCounter loader = new LoadCounter();
procStore.start(PROCEDURE_STORE_SLOTS);
procStore.recoverLease();
procStore.load(loader);
assertEquals(procs.length, loader.getMaxProcId());
assertEquals(procs.length - 1, loader.getLoadedCount());
assertEquals(0, loader.getCorruptedCount());
}
private void corruptLog(final FileStatus logFile, final long dropBytes) private void corruptLog(final FileStatus logFile, final long dropBytes)
throws IOException { throws IOException {
assertTrue(logFile.getLen() > dropBytes); assertTrue(logFile.getLen() > dropBytes);